From d964eae817b538c532996af0b41993d128fa5a5c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 24 Apr 2008 17:49:03 +0000 Subject: QPID-832 : Fix eol-style git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651325 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQSessionAdapter.java | 52 +- .../qpid/client/AMQUndefinedDestination.java | 80 +- .../org/apache/qpid/client/CustomJMSXProperty.java | 132 +- .../apache/qpid/client/TemporaryDestination.java | 76 +- .../qpid/client/failover/FailoverNoopSupport.java | 150 +- .../failover/FailoverProtectedOperation.java | 98 +- .../qpid/client/failover/FailoverRetrySupport.java | 270 ++-- .../handler/AccessRequestOkMethodHandler.java | 72 +- .../client/handler/ClientMethodDispatcherImpl.java | 1056 ++++++------- .../handler/ClientMethodDispatcherImpl_0_9.java | 310 ++-- .../handler/ClientMethodDispatcherImpl_8_0.java | 170 +-- .../client/message/AbstractBytesTypedMessage.java | 1604 ++++++++++---------- .../qpid/client/message/JMSHeaderAdapter.java | 1104 +++++++------- .../state/AMQMethodNotImplementedException.java | 64 +- .../src/main/java/org/apache/qpid/jms/Message.java | 56 +- .../test/unit/basic/InvalidDestinationTest.java | 208 +-- .../client/temporaryqueue/TemporaryQueueTest.java | 442 +++--- .../qpid/test/unit/close/CloseBeforeAckTest.java | 288 ++-- .../qpid/test/unit/message/JMSDestinationTest.java | 178 +-- 19 files changed, 3205 insertions(+), 3205 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java index 93f10761e2..7e257e0c20 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java @@ -1,26 +1,26 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -public interface AMQSessionAdapter -{ - public AMQSession getSession(); -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +public interface AMQSessionAdapter +{ + public AMQSession getSession(); +} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java index 768e2862b3..fa2afb3ee4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java @@ -1,40 +1,40 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import org.apache.qpid.framing.AMQShortString; - -public class AMQUndefinedDestination extends AMQDestination -{ - - private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown"); - - - public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) - { - super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName); - } - - public boolean isNameRequired() - { - return getAMQQueueName() == null; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import org.apache.qpid.framing.AMQShortString; + +public class AMQUndefinedDestination extends AMQDestination +{ + + private static final AMQShortString UNKNOWN_EXCHANGE_CLASS = new AMQShortString("unknown"); + + + public AMQUndefinedDestination(AMQShortString exchange, AMQShortString routingKey, AMQShortString queueName) + { + super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName); + } + + public boolean isNameRequired() + { + return getAMQQueueName() == null; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java index 8cb285c1ac..7cc548915c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java +++ b/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java @@ -1,66 +1,66 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; - -import org.apache.qpid.framing.AMQShortString; - -public enum CustomJMSXProperty -{ - JMS_AMQP_NULL, - JMS_QPID_DESTTYPE, - JMSXGroupID, - JMSXGroupSeq, - JMSXUserID; - - - private final AMQShortString _nameAsShortString; - - CustomJMSXProperty() - { - _nameAsShortString = new AMQShortString(toString()); - } - - public AMQShortString getShortStringName() - { - return _nameAsShortString; - } - - private static Enumeration _names; - - public static synchronized Enumeration asEnumeration() - { - if(_names == null) - { - CustomJMSXProperty[] properties = values(); - ArrayList nameList = new ArrayList(properties.length); - for(CustomJMSXProperty property : properties) - { - nameList.add(property.toString()); - } - _names = Collections.enumeration(nameList); - } - return _names; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; + +import org.apache.qpid.framing.AMQShortString; + +public enum CustomJMSXProperty +{ + JMS_AMQP_NULL, + JMS_QPID_DESTTYPE, + JMSXGroupID, + JMSXGroupSeq, + JMSXUserID; + + + private final AMQShortString _nameAsShortString; + + CustomJMSXProperty() + { + _nameAsShortString = new AMQShortString(toString()); + } + + public AMQShortString getShortStringName() + { + return _nameAsShortString; + } + + private static Enumeration _names; + + public static synchronized Enumeration asEnumeration() + { + if(_names == null) + { + CustomJMSXProperty[] properties = values(); + ArrayList nameList = new ArrayList(properties.length); + for(CustomJMSXProperty property : properties) + { + nameList.add(property.toString()); + } + _names = Collections.enumeration(nameList); + } + return _names; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java index 03d25aa243..7f8e80c73a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java +++ b/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java @@ -1,38 +1,38 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.client; - -import javax.jms.Destination; -import javax.jms.JMSException; - -/** - * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue - * so that operations related to their "temporary-ness" can be abstracted out. - */ -interface TemporaryDestination extends Destination -{ - - public void delete() throws JMSException; - public AMQSession getSession(); - public boolean isDeleted(); - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client; + +import javax.jms.Destination; +import javax.jms.JMSException; + +/** + * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue + * so that operations related to their "temporary-ness" can be abstracted out. + */ +interface TemporaryDestination extends Destination +{ + + public void delete() throws JMSException; + public AMQSession getSession(); + public boolean isDeleted(); + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java index 1ec98efe0e..51cc94965a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java @@ -1,75 +1,75 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.client.failover; - -import org.apache.qpid.client.AMQConnection; - -/** - * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support - * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this - * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be - * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception, - * for example, because the caller already holds a lock preventing that condition from arising. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Perform a fail-over protected operation raising providing no handling of fail-over conditions. - *
- */ -public class FailoverNoopSupport implements FailoverSupport -{ - /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; - - /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; - - /** - * Creates an automatic retrying fail-over handler for the specified operation. - * - * @param operation The fail-over protected operation to wrap in this handler. - */ - public FailoverNoopSupport(FailoverProtectedOperation operation, AMQConnection con) - { - this.operation = operation; - this.connection = con; - } - - /** - * Delegates to another continuation which is to be provided with fail-over handling. - * - * @return The return value from the delegated to continuation. - * @throws E Any exception that the delegated to continuation may raise. - */ - public T execute() throws E - { - try - { - return operation.execute(); - } - catch (FailoverException e) - { - throw new IllegalStateException("Fail-over interupted no-op failover support. " - + "No-op support should only be used where the caller is certain fail-over cannot occur.", e); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.failover; + +import org.apache.qpid.client.AMQConnection; + +/** + * FailoverNoopSupport is a {@link FailoverSupport} implementation that does not really provide any failover support + * at all. It wraps a {@link FailoverProtectedOperation} but should that operation throw {@link FailoverException} this + * support class simply re-raises that exception as an IllegalStateException. This support wrapper should only be + * used where the caller can be certain that the failover protected operation cannot acutally throw a failover exception, + * for example, because the caller already holds a lock preventing that condition from arising. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Perform a fail-over protected operation raising providing no handling of fail-over conditions. + *
+ */ +public class FailoverNoopSupport implements FailoverSupport +{ + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverNoopSupport(FailoverProtectedOperation operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delegates to another continuation which is to be provided with fail-over handling. + * + * @return The return value from the delegated to continuation. + * @throws E Any exception that the delegated to continuation may raise. + */ + public T execute() throws E + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + throw new IllegalStateException("Fail-over interupted no-op failover support. " + + "No-op support should only be used where the caller is certain fail-over cannot occur.", e); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java index 9a7f43926e..e9c5f24791 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java @@ -1,49 +1,49 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.client.failover; - -/** - * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because - * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers - * for failover protected operations, in order to provide different handling schemes when failovers occurr. - * - *

The type of checked exception that the operation may perform has been generified, in order that fail over - * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not - * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes - * will mask the exception. - * - *

- *
CRC Card
Responsibilities - *
Perform an operation that may be interrupted by fail-over. - *
- */ -public interface FailoverProtectedOperation -{ - /** - * Performs the continuations work. - * - * @return Provdes scope for the continuation to return an arbitrary value. - * - * @throws FailoverException If the operation is interrupted by a fail-over notification. - */ - public abstract T execute() throws E, FailoverException; -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.failover; + +/** + * FailoverProtectedOperation is a continuation for an operation that may throw a {@link FailoverException} because + * it has been interrupted by the fail-over process. The {@link FailoverRetrySupport} class defines support wrappers + * for failover protected operations, in order to provide different handling schemes when failovers occurr. + * + *

The type of checked exception that the operation may perform has been generified, in order that fail over + * protected operations can be defined that raise arbitrary exceptions. The actuall exception types used should not + * be sub-classes of FailoverException, or else catching FailoverException in the {@link FailoverRetrySupport} classes + * will mask the exception. + * + *

+ *
CRC Card
Responsibilities + *
Perform an operation that may be interrupted by fail-over. + *
+ */ +public interface FailoverProtectedOperation +{ + /** + * Performs the continuations work. + * + * @return Provdes scope for the continuation to return an arbitrary value. + * + * @throws FailoverException If the operation is interrupted by a fail-over notification. + */ + public abstract T execute() throws E, FailoverException; +} diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java index e756d7baf9..cf7e978c03 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java @@ -1,135 +1,135 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.failover; - -import org.apache.qpid.client.AMQConnection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified - * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due - * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request, - * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This - * automatic retrying is continued until the continuation succeeds, or throws an exception (different to - * FailoverException, which is used to signal the failure of the original condition). - * - *

The blocking condition used is that the connection is not currently failing over, and the mutex used is the - * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods. - * These are used like a lock and condition variable. - * - *

The wrapped operation may throw a FailoverException, this is an exception that can be raised by a - * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a - * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are - * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous - * methods that should not be attempted when a fail-over is in progress. - * - *

Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be - * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want - * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation - * until it succeeds. Synchronous methods are usually coordinated with a - * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants - * to start and throws a FailoverException in response to this. - * - *

Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be - * started during fail-over, but be delayed until any current fail-over has completed. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Provide a continuation synchronized on a fail-over lock and condition. - *
Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception. - *
- * - * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see - * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary - * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. - * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type - * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. - * - * @todo InterruptedException not handled well. - */ -public class FailoverRetrySupport implements FailoverSupport -{ - /** Used for debugging. */ - private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); - - /** The protected operation that is to be retried in the event of fail-over. */ - FailoverProtectedOperation operation; - - /** The connection on which the fail-over protected operation is to be performed. */ - AMQConnection connection; - - /** - * Creates an automatic retrying fail-over handler for the specified operation. - * - * @param operation The fail-over protected operation to wrap in this handler. - */ - public FailoverRetrySupport(FailoverProtectedOperation operation, AMQConnection con) - { - this.operation = operation; - this.connection = con; - } - - /** - * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats - * until the operation throws AMQException or succeeds without being interrupted by fail-over. - * - * @return The result of executing the continuation. - * - * @throws E Any underlying exception is allowed to fall through. - */ - public T execute() throws E - { - while (true) - { - try - { - connection.blockUntilNotFailingOver(); - } - catch (InterruptedException e) - { - _log.debug("Interrupted: " + e, e); - - return null; - } - - synchronized (connection.getFailoverMutex()) - { - try - { - return operation.execute(); - } - catch (FailoverException e) - { - _log.debug("Failover exception caught during operation: " + e, e); - } - catch (IllegalStateException e) - { - if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) - { - throw e; - } - } - } - } - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.qpid.client.AMQConnection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * FailoverRetrySupport is a continuation that wraps another continuation, delaying its execution until it is notified + * that a blocking condition has been met, and executing the continuation within a mutex. If the continuation fails, due + * to the original condition being broken, whilst the continuation is waiting for a reponse to a synchronous request, + * FailoverRetrySupport automatcally rechecks the condition and re-acquires the mutex and re-runs the continution. This + * automatic retrying is continued until the continuation succeeds, or throws an exception (different to + * FailoverException, which is used to signal the failure of the original condition). + * + *

The blocking condition used is that the connection is not currently failing over, and the mutex used is the + * connection failover mutex, which guards against the fail-over process being run during fail-over vulnerable methods. + * These are used like a lock and condition variable. + * + *

The wrapped operation may throw a FailoverException, this is an exception that can be raised by a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener}, in response to it being notified that a + * fail-over wants to start whilst it was waiting. Methods that are vulnerable to fail-over are those that are + * synchronous, where a failure will prevent them from getting the reply they are waiting for and asynchronous + * methods that should not be attempted when a fail-over is in progress. + * + *

Wrapping a synchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. Should a fail-over process want + * to start whilst waiting for the synchrnous reply, the FailoverRetrySupport will detect this and rety the operation + * until it succeeds. Synchronous methods are usually coordinated with a + * {@link org.apache.qpid.client.protocol.BlockingMethodFrameListener} which is notified when a fail-over process wants + * to start and throws a FailoverException in response to this. + * + *

Wrapping an asynchronous method in a FailoverRetrySupport will have the effect that the operation will not be + * started during fail-over, but be delayed until any current fail-over has completed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Provide a continuation synchronized on a fail-over lock and condition. + *
Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception. + *
+ * + * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see + * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary + * Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation. + * Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type + * to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that. + * + * @todo InterruptedException not handled well. + */ +public class FailoverRetrySupport implements FailoverSupport +{ + /** Used for debugging. */ + private static final Logger _log = LoggerFactory.getLogger(FailoverRetrySupport.class); + + /** The protected operation that is to be retried in the event of fail-over. */ + FailoverProtectedOperation operation; + + /** The connection on which the fail-over protected operation is to be performed. */ + AMQConnection connection; + + /** + * Creates an automatic retrying fail-over handler for the specified operation. + * + * @param operation The fail-over protected operation to wrap in this handler. + */ + public FailoverRetrySupport(FailoverProtectedOperation operation, AMQConnection con) + { + this.operation = operation; + this.connection = con; + } + + /** + * Delays a continuation until the "not failing over" condition is met on the specified connection. Repeats + * until the operation throws AMQException or succeeds without being interrupted by fail-over. + * + * @return The result of executing the continuation. + * + * @throws E Any underlying exception is allowed to fall through. + */ + public T execute() throws E + { + while (true) + { + try + { + connection.blockUntilNotFailingOver(); + } + catch (InterruptedException e) + { + _log.debug("Interrupted: " + e, e); + + return null; + } + + synchronized (connection.getFailoverMutex()) + { + try + { + return operation.execute(); + } + catch (FailoverException e) + { + _log.debug("Failover exception caught during operation: " + e, e); + } + catch (IllegalStateException e) + { + if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support"))) + { + throw e; + } + } + } + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java index a150d1446a..5bd36aa88b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java @@ -1,36 +1,36 @@ -package org.apache.qpid.client.handler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.framing.*; -import org.apache.qpid.client.state.StateAwareMethodListener; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.AMQNoRouteException; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInvalidRoutingKeyException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.protocol.AMQConstant; - -public class AccessRequestOkMethodHandler implements StateAwareMethodListener -{ - private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class); - - private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler(); - - public static AccessRequestOkMethodHandler getInstance() - { - return _handler; - } - - public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId) - throws AMQException - { - _logger.debug("AccessRequestOk method received"); - final AMQProtocolSession session = stateManager.getProtocolSession(); - session.setTicket(method.getTicket(), channelId); - - } -} +package org.apache.qpid.client.handler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.framing.*; +import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.protocol.AMQProtocolSession; +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.protocol.AMQConstant; + +public class AccessRequestOkMethodHandler implements StateAwareMethodListener +{ + private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class); + + private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler(); + + public static AccessRequestOkMethodHandler getInstance() + { + return _handler; + } + + public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId) + throws AMQException + { + _logger.debug("AccessRequestOk method received"); + final AMQProtocolSession session = stateManager.getProtocolSession(); + session.setTicket(method.getTicket(), channelId); + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index de976b05bd..afb7517a12 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -1,528 +1,528 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import java.util.Map; -import java.util.HashMap; - -import org.apache.qpid.framing.*; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQMethodNotImplementedException; - - -public class ClientMethodDispatcherImpl implements MethodDispatcher -{ - - - private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); - private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); - private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); - private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); - private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); - private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); - private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); - private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); - private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); - private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); - private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); - private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); - private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); - - - - private static interface DispatcherFactory - { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager); - } - - private static final Map _dispatcherFactories = - new HashMap(); - - static - { - _dispatcherFactories.put(ProtocolVersion.v8_0, - new DispatcherFactory() - { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) - { - return new ClientMethodDispatcherImpl_8_0(stateManager); - } - }); - - _dispatcherFactories.put(ProtocolVersion.v0_9, - new DispatcherFactory() - { - public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) - { - return new ClientMethodDispatcherImpl_0_9(stateManager); - } - }); - - } - - - public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager) - { - DispatcherFactory factory = _dispatcherFactories.get(version); - return factory.createMethodDispatcher(stateManager); - } - - - - - private AMQStateManager _stateManager; - - public ClientMethodDispatcherImpl(AMQStateManager stateManager) - { - _stateManager = stateManager; - } - - - public AMQStateManager getStateManager() - { - return _stateManager; - } - - public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException - { - _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException - { - _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException - { - _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException - { - _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException - { - _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException - { - _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException - { - _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException - { - _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException - { - _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException - { - _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException - { - _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException - { - _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException - { - _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId); - return true; - } - - public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException - { - return false; - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.qpid.framing.*; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; + + +public class ClientMethodDispatcherImpl implements MethodDispatcher +{ + + + private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance(); + private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance(); + private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); + private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); + private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); + private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); + private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); + private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance(); + private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance(); + private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance(); + private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance(); + private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance(); + + + + private static interface DispatcherFactory + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager); + } + + private static final Map _dispatcherFactories = + new HashMap(); + + static + { + _dispatcherFactories.put(ProtocolVersion.v8_0, + new DispatcherFactory() + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + { + return new ClientMethodDispatcherImpl_8_0(stateManager); + } + }); + + _dispatcherFactories.put(ProtocolVersion.v0_9, + new DispatcherFactory() + { + public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager) + { + return new ClientMethodDispatcherImpl_0_9(stateManager); + } + }); + + } + + + public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager) + { + DispatcherFactory factory = _dispatcherFactories.get(version); + return factory.createMethodDispatcher(stateManager); + } + + + + + private AMQStateManager _stateManager; + + public ClientMethodDispatcherImpl(AMQStateManager stateManager) + { + _stateManager = stateManager; + } + + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException + { + _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException + { + _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException + { + _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException + { + _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException + { + _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException + { + _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException + { + _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException + { + _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException + { + _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException + { + _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException + { + _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException + { + _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException + { + _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId); + return true; + } + + public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException + { + return false; + } + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java index ae6d5e8283..e235368357 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java @@ -1,155 +1,155 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.handler; - -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; -import org.apache.qpid.client.state.AMQMethodNotImplementedException; - - -public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 -{ - public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager) - { - super(stateManager); - } - - - public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException - { - throw new AMQMethodNotImplementedException(body); - } - - public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException - { - return false; - } - - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.state.AMQMethodNotImplementedException; + + +public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9 +{ + public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager) + { + super(stateManager); + } + + + public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException + { + throw new AMQMethodNotImplementedException(body); + } + + public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException + { + return false; + } + + +} diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java index 6bd6874cde..b0f003cd2d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java @@ -1,85 +1,85 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - package org.apache.qpid.client.handler; - -import org.apache.qpid.framing.*; -import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.state.AMQStateManager; - -public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 -{ - public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager) - { - super(stateManager); - } - - public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException - { - return false; - } - - public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException - { - return false; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + package org.apache.qpid.client.handler; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.state.AMQStateManager; + +public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0 +{ + public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager) + { + super(stateManager); + } + + public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException + { + return false; + } + + public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException + { + return false; + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java index 5904131122..9bdde01bf3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java @@ -1,802 +1,802 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ - -package org.apache.qpid.client.message; - -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; - -import javax.jms.JMSException; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; - -/** - * @author Apache Software Foundation - */ -public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage -{ - - protected static final byte BOOLEAN_TYPE = (byte) 1; - - protected static final byte BYTE_TYPE = (byte) 2; - - protected static final byte BYTEARRAY_TYPE = (byte) 3; - - protected static final byte SHORT_TYPE = (byte) 4; - - protected static final byte CHAR_TYPE = (byte) 5; - - protected static final byte INT_TYPE = (byte) 6; - - protected static final byte LONG_TYPE = (byte) 7; - - protected static final byte FLOAT_TYPE = (byte) 8; - - protected static final byte DOUBLE_TYPE = (byte) 9; - - protected static final byte STRING_TYPE = (byte) 10; - - protected static final byte NULL_STRING_TYPE = (byte) 11; - - /** - * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read - * a byte array in multiple chunks, hence this is used to track how much is left to be read - */ - private int _byteArrayRemaining = -1; - - AbstractBytesTypedMessage() - { - this(null); - } - - /** - * Construct a stream message with existing data. - * - * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is - * set to auto expand - */ - AbstractBytesTypedMessage(ByteBuffer data) - { - super(data); // this instanties a content header - } - - - AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, - AMQShortString routingKey, ByteBuffer data) throws AMQException - { - super(messageNbr, contentHeader, exchange, routingKey, data); - } - - - protected byte readWireType() throws MessageFormatException, MessageEOFException, - MessageNotReadableException - { - checkReadable(); - checkAvailable(1); - return _data.get(); - } - - protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException - { - checkWritable(); - _data.put(type); - _changedData = true; - } - - protected boolean readBoolean() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - boolean result; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Boolean.parseBoolean(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private boolean readBooleanImpl() - { - return _data.get() != 0; - } - - protected byte readByte() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - byte result; - try - { - switch (wireType) - { - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Byte.parseByte(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private byte readByteImpl() - { - return _data.get(); - } - - protected short readShort() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - short result; - try - { - switch (wireType) - { - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Short.parseShort(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a short"); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - return result; - } - - private short readShortImpl() - { - return _data.getShort(); - } - - /** - * Note that this method reads a unicode character as two bytes from the stream - * - * @return the character read from the stream - * @throws javax.jms.JMSException - */ - protected char readChar() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - try - { - if(wireType == NULL_STRING_TYPE){ - throw new NullPointerException(); - } - - if (wireType != CHAR_TYPE) - { - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a char"); - } - else - { - checkAvailable(2); - return readCharImpl(); - } - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private char readCharImpl() - { - return _data.getChar(); - } - - protected int readInt() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - int result; - try - { - switch (wireType) - { - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Integer.parseInt(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to an int"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected int readIntImpl() - { - return _data.getInt(); - } - - protected long readLong() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - long result; - try - { - switch (wireType) - { - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Long.parseLong(readStringImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a long"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private long readLongImpl() - { - return _data.getLong(); - } - - protected float readFloat() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - float result; - try - { - switch (wireType) - { - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Float.parseFloat(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a float"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private float readFloatImpl() - { - return _data.getFloat(); - } - - protected double readDouble() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - double result; - try - { - switch (wireType) - { - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case STRING_TYPE: - checkAvailable(1); - result = Double.parseDouble(readStringImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a double"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - private double readDoubleImpl() - { - return _data.getDouble(); - } - - protected String readString() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - String result; - try - { - switch (wireType) - { - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - case NULL_STRING_TYPE: - result = null; - throw new NullPointerException("data is null"); - case BOOLEAN_TYPE: - checkAvailable(1); - result = String.valueOf(readBooleanImpl()); - break; - case LONG_TYPE: - checkAvailable(8); - result = String.valueOf(readLongImpl()); - break; - case INT_TYPE: - checkAvailable(4); - result = String.valueOf(readIntImpl()); - break; - case SHORT_TYPE: - checkAvailable(2); - result = String.valueOf(readShortImpl()); - break; - case BYTE_TYPE: - checkAvailable(1); - result = String.valueOf(readByteImpl()); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = String.valueOf(readFloatImpl()); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = String.valueOf(readDoubleImpl()); - break; - case CHAR_TYPE: - checkAvailable(2); - result = String.valueOf(readCharImpl()); - break; - default: - _data.position(position); - throw new MessageFormatException("Unable to convert " + wireType + " to a String"); - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected String readStringImpl() throws JMSException - { - try - { - return _data.getString(Charset.forName("UTF-8").newDecoder()); - } - catch (CharacterCodingException e) - { - JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); - je.setLinkedException(e); - throw je; - } - } - - protected int readBytes(byte[] bytes) throws JMSException - { - if (bytes == null) - { - throw new IllegalArgumentException("byte array must not be null"); - } - checkReadable(); - // first call - if (_byteArrayRemaining == -1) - { - // type discriminator checked separately so you get a MessageFormatException rather than - // an EOF even in the case where both would be applicable - checkAvailable(1); - byte wireType = readWireType(); - if (wireType != BYTEARRAY_TYPE) - { - throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); - } - checkAvailable(4); - int size = _data.getInt(); - // length of -1 indicates null - if (size == -1) - { - return -1; - } - else - { - if (size > _data.remaining()) - { - throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + - _data.remaining() + " bytes"); - } - else - { - _byteArrayRemaining = size; - } - } - } - else if (_byteArrayRemaining == 0) - { - _byteArrayRemaining = -1; - return -1; - } - - int returnedSize = readBytesImpl(bytes); - if (returnedSize < bytes.length) - { - _byteArrayRemaining = -1; - } - return returnedSize; - } - - private int readBytesImpl(byte[] bytes) - { - int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); - _byteArrayRemaining -= count; - - if (count == 0) - { - return 0; - } - else - { - _data.get(bytes, 0, count); - return count; - } - } - - protected Object readObject() throws JMSException - { - int position = _data.position(); - byte wireType = readWireType(); - Object result = null; - try - { - switch (wireType) - { - case BOOLEAN_TYPE: - checkAvailable(1); - result = readBooleanImpl(); - break; - case BYTE_TYPE: - checkAvailable(1); - result = readByteImpl(); - break; - case BYTEARRAY_TYPE: - checkAvailable(4); - int size = _data.getInt(); - if (size == -1) - { - result = null; - } - else - { - _byteArrayRemaining = size; - byte[] bytesResult = new byte[size]; - readBytesImpl(bytesResult); - result = bytesResult; - } - break; - case SHORT_TYPE: - checkAvailable(2); - result = readShortImpl(); - break; - case CHAR_TYPE: - checkAvailable(2); - result = readCharImpl(); - break; - case INT_TYPE: - checkAvailable(4); - result = readIntImpl(); - break; - case LONG_TYPE: - checkAvailable(8); - result = readLongImpl(); - break; - case FLOAT_TYPE: - checkAvailable(4); - result = readFloatImpl(); - break; - case DOUBLE_TYPE: - checkAvailable(8); - result = readDoubleImpl(); - break; - case NULL_STRING_TYPE: - result = null; - break; - case STRING_TYPE: - checkAvailable(1); - result = readStringImpl(); - break; - } - return result; - } - catch (RuntimeException e) - { - _data.position(position); - throw e; - } - } - - protected void writeBoolean(boolean b) throws JMSException - { - writeTypeDiscriminator(BOOLEAN_TYPE); - _data.put(b ? (byte) 1 : (byte) 0); - } - - protected void writeByte(byte b) throws JMSException - { - writeTypeDiscriminator(BYTE_TYPE); - _data.put(b); - } - - protected void writeShort(short i) throws JMSException - { - writeTypeDiscriminator(SHORT_TYPE); - _data.putShort(i); - } - - protected void writeChar(char c) throws JMSException - { - writeTypeDiscriminator(CHAR_TYPE); - _data.putChar(c); - } - - protected void writeInt(int i) throws JMSException - { - writeTypeDiscriminator(INT_TYPE); - writeIntImpl(i); - } - - protected void writeIntImpl(int i) - { - _data.putInt(i); - } - - protected void writeLong(long l) throws JMSException - { - writeTypeDiscriminator(LONG_TYPE); - _data.putLong(l); - } - - protected void writeFloat(float v) throws JMSException - { - writeTypeDiscriminator(FLOAT_TYPE); - _data.putFloat(v); - } - - protected void writeDouble(double v) throws JMSException - { - writeTypeDiscriminator(DOUBLE_TYPE); - _data.putDouble(v); - } - - protected void writeString(String string) throws JMSException - { - if (string == null) - { - writeTypeDiscriminator(NULL_STRING_TYPE); - } - else - { - writeTypeDiscriminator(STRING_TYPE); - try - { - writeStringImpl(string); - } - catch (CharacterCodingException e) - { - JMSException ex = new JMSException("Unable to encode string: " + e); - ex.setLinkedException(e); - throw ex; - } - } - } - - protected void writeStringImpl(String string) - throws CharacterCodingException - { - _data.putString(string, Charset.forName("UTF-8").newEncoder()); - // we must write the null terminator ourselves - _data.put((byte) 0); - } - - protected void writeBytes(byte[] bytes) throws JMSException - { - writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); - } - - protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException - { - writeTypeDiscriminator(BYTEARRAY_TYPE); - if (bytes == null) - { - _data.putInt(-1); - } - else - { - _data.putInt(length); - _data.put(bytes, offset, length); - } - } - - protected void writeObject(Object object) throws JMSException - { - checkWritable(); - Class clazz; - - if (object == null) - { - // string handles the output of null values - clazz = String.class; - } - else - { - clazz = object.getClass(); - } - - if (clazz == Byte.class) - { - writeByte((Byte) object); - } - else if (clazz == Boolean.class) - { - writeBoolean((Boolean) object); - } - else if (clazz == byte[].class) - { - writeBytes((byte[]) object); - } - else if (clazz == Short.class) - { - writeShort((Short) object); - } - else if (clazz == Character.class) - { - writeChar((Character) object); - } - else if (clazz == Integer.class) - { - writeInt((Integer) object); - } - else if (clazz == Long.class) - { - writeLong((Long) object); - } - else if (clazz == Float.class) - { - writeFloat((Float) object); - } - else if (clazz == Double.class) - { - writeDouble((Double) object); - } - else if (clazz == String.class) - { - writeString((String) object); - } - else - { - throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.client.message; + +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; + +/** + * @author Apache Software Foundation + */ +public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage +{ + + protected static final byte BOOLEAN_TYPE = (byte) 1; + + protected static final byte BYTE_TYPE = (byte) 2; + + protected static final byte BYTEARRAY_TYPE = (byte) 3; + + protected static final byte SHORT_TYPE = (byte) 4; + + protected static final byte CHAR_TYPE = (byte) 5; + + protected static final byte INT_TYPE = (byte) 6; + + protected static final byte LONG_TYPE = (byte) 7; + + protected static final byte FLOAT_TYPE = (byte) 8; + + protected static final byte DOUBLE_TYPE = (byte) 9; + + protected static final byte STRING_TYPE = (byte) 10; + + protected static final byte NULL_STRING_TYPE = (byte) 11; + + /** + * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read + * a byte array in multiple chunks, hence this is used to track how much is left to be read + */ + private int _byteArrayRemaining = -1; + + AbstractBytesTypedMessage() + { + this(null); + } + + /** + * Construct a stream message with existing data. + * + * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is + * set to auto expand + */ + AbstractBytesTypedMessage(ByteBuffer data) + { + super(data); // this instanties a content header + } + + + AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, + AMQShortString routingKey, ByteBuffer data) throws AMQException + { + super(messageNbr, contentHeader, exchange, routingKey, data); + } + + + protected byte readWireType() throws MessageFormatException, MessageEOFException, + MessageNotReadableException + { + checkReadable(); + checkAvailable(1); + return _data.get(); + } + + protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException + { + checkWritable(); + _data.put(type); + _changedData = true; + } + + protected boolean readBoolean() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + boolean result; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Boolean.parseBoolean(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a boolean"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private boolean readBooleanImpl() + { + return _data.get() != 0; + } + + protected byte readByte() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + byte result; + try + { + switch (wireType) + { + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Byte.parseByte(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a byte"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private byte readByteImpl() + { + return _data.get(); + } + + protected short readShort() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + short result; + try + { + switch (wireType) + { + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Short.parseShort(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a short"); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + return result; + } + + private short readShortImpl() + { + return _data.getShort(); + } + + /** + * Note that this method reads a unicode character as two bytes from the stream + * + * @return the character read from the stream + * @throws javax.jms.JMSException + */ + protected char readChar() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + try + { + if(wireType == NULL_STRING_TYPE){ + throw new NullPointerException(); + } + + if (wireType != CHAR_TYPE) + { + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a char"); + } + else + { + checkAvailable(2); + return readCharImpl(); + } + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private char readCharImpl() + { + return _data.getChar(); + } + + protected int readInt() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + int result; + try + { + switch (wireType) + { + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Integer.parseInt(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to an int"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected int readIntImpl() + { + return _data.getInt(); + } + + protected long readLong() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + long result; + try + { + switch (wireType) + { + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Long.parseLong(readStringImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a long"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private long readLongImpl() + { + return _data.getLong(); + } + + protected float readFloat() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + float result; + try + { + switch (wireType) + { + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Float.parseFloat(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a float"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private float readFloatImpl() + { + return _data.getFloat(); + } + + protected double readDouble() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + double result; + try + { + switch (wireType) + { + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case STRING_TYPE: + checkAvailable(1); + result = Double.parseDouble(readStringImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a double"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + private double readDoubleImpl() + { + return _data.getDouble(); + } + + protected String readString() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + String result; + try + { + switch (wireType) + { + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + case NULL_STRING_TYPE: + result = null; + throw new NullPointerException("data is null"); + case BOOLEAN_TYPE: + checkAvailable(1); + result = String.valueOf(readBooleanImpl()); + break; + case LONG_TYPE: + checkAvailable(8); + result = String.valueOf(readLongImpl()); + break; + case INT_TYPE: + checkAvailable(4); + result = String.valueOf(readIntImpl()); + break; + case SHORT_TYPE: + checkAvailable(2); + result = String.valueOf(readShortImpl()); + break; + case BYTE_TYPE: + checkAvailable(1); + result = String.valueOf(readByteImpl()); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = String.valueOf(readFloatImpl()); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = String.valueOf(readDoubleImpl()); + break; + case CHAR_TYPE: + checkAvailable(2); + result = String.valueOf(readCharImpl()); + break; + default: + _data.position(position); + throw new MessageFormatException("Unable to convert " + wireType + " to a String"); + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected String readStringImpl() throws JMSException + { + try + { + return _data.getString(Charset.forName("UTF-8").newDecoder()); + } + catch (CharacterCodingException e) + { + JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e); + je.setLinkedException(e); + throw je; + } + } + + protected int readBytes(byte[] bytes) throws JMSException + { + if (bytes == null) + { + throw new IllegalArgumentException("byte array must not be null"); + } + checkReadable(); + // first call + if (_byteArrayRemaining == -1) + { + // type discriminator checked separately so you get a MessageFormatException rather than + // an EOF even in the case where both would be applicable + checkAvailable(1); + byte wireType = readWireType(); + if (wireType != BYTEARRAY_TYPE) + { + throw new MessageFormatException("Unable to convert " + wireType + " to a byte array"); + } + checkAvailable(4); + int size = _data.getInt(); + // length of -1 indicates null + if (size == -1) + { + return -1; + } + else + { + if (size > _data.remaining()) + { + throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " + + _data.remaining() + " bytes"); + } + else + { + _byteArrayRemaining = size; + } + } + } + else if (_byteArrayRemaining == 0) + { + _byteArrayRemaining = -1; + return -1; + } + + int returnedSize = readBytesImpl(bytes); + if (returnedSize < bytes.length) + { + _byteArrayRemaining = -1; + } + return returnedSize; + } + + private int readBytesImpl(byte[] bytes) + { + int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining); + _byteArrayRemaining -= count; + + if (count == 0) + { + return 0; + } + else + { + _data.get(bytes, 0, count); + return count; + } + } + + protected Object readObject() throws JMSException + { + int position = _data.position(); + byte wireType = readWireType(); + Object result = null; + try + { + switch (wireType) + { + case BOOLEAN_TYPE: + checkAvailable(1); + result = readBooleanImpl(); + break; + case BYTE_TYPE: + checkAvailable(1); + result = readByteImpl(); + break; + case BYTEARRAY_TYPE: + checkAvailable(4); + int size = _data.getInt(); + if (size == -1) + { + result = null; + } + else + { + _byteArrayRemaining = size; + byte[] bytesResult = new byte[size]; + readBytesImpl(bytesResult); + result = bytesResult; + } + break; + case SHORT_TYPE: + checkAvailable(2); + result = readShortImpl(); + break; + case CHAR_TYPE: + checkAvailable(2); + result = readCharImpl(); + break; + case INT_TYPE: + checkAvailable(4); + result = readIntImpl(); + break; + case LONG_TYPE: + checkAvailable(8); + result = readLongImpl(); + break; + case FLOAT_TYPE: + checkAvailable(4); + result = readFloatImpl(); + break; + case DOUBLE_TYPE: + checkAvailable(8); + result = readDoubleImpl(); + break; + case NULL_STRING_TYPE: + result = null; + break; + case STRING_TYPE: + checkAvailable(1); + result = readStringImpl(); + break; + } + return result; + } + catch (RuntimeException e) + { + _data.position(position); + throw e; + } + } + + protected void writeBoolean(boolean b) throws JMSException + { + writeTypeDiscriminator(BOOLEAN_TYPE); + _data.put(b ? (byte) 1 : (byte) 0); + } + + protected void writeByte(byte b) throws JMSException + { + writeTypeDiscriminator(BYTE_TYPE); + _data.put(b); + } + + protected void writeShort(short i) throws JMSException + { + writeTypeDiscriminator(SHORT_TYPE); + _data.putShort(i); + } + + protected void writeChar(char c) throws JMSException + { + writeTypeDiscriminator(CHAR_TYPE); + _data.putChar(c); + } + + protected void writeInt(int i) throws JMSException + { + writeTypeDiscriminator(INT_TYPE); + writeIntImpl(i); + } + + protected void writeIntImpl(int i) + { + _data.putInt(i); + } + + protected void writeLong(long l) throws JMSException + { + writeTypeDiscriminator(LONG_TYPE); + _data.putLong(l); + } + + protected void writeFloat(float v) throws JMSException + { + writeTypeDiscriminator(FLOAT_TYPE); + _data.putFloat(v); + } + + protected void writeDouble(double v) throws JMSException + { + writeTypeDiscriminator(DOUBLE_TYPE); + _data.putDouble(v); + } + + protected void writeString(String string) throws JMSException + { + if (string == null) + { + writeTypeDiscriminator(NULL_STRING_TYPE); + } + else + { + writeTypeDiscriminator(STRING_TYPE); + try + { + writeStringImpl(string); + } + catch (CharacterCodingException e) + { + JMSException ex = new JMSException("Unable to encode string: " + e); + ex.setLinkedException(e); + throw ex; + } + } + } + + protected void writeStringImpl(String string) + throws CharacterCodingException + { + _data.putString(string, Charset.forName("UTF-8").newEncoder()); + // we must write the null terminator ourselves + _data.put((byte) 0); + } + + protected void writeBytes(byte[] bytes) throws JMSException + { + writeBytes(bytes, 0, bytes == null ? 0 : bytes.length); + } + + protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException + { + writeTypeDiscriminator(BYTEARRAY_TYPE); + if (bytes == null) + { + _data.putInt(-1); + } + else + { + _data.putInt(length); + _data.put(bytes, offset, length); + } + } + + protected void writeObject(Object object) throws JMSException + { + checkWritable(); + Class clazz; + + if (object == null) + { + // string handles the output of null values + clazz = String.class; + } + else + { + clazz = object.getClass(); + } + + if (clazz == Byte.class) + { + writeByte((Byte) object); + } + else if (clazz == Boolean.class) + { + writeBoolean((Boolean) object); + } + else if (clazz == byte[].class) + { + writeBytes((byte[]) object); + } + else if (clazz == Short.class) + { + writeShort((Short) object); + } + else if (clazz == Character.class) + { + writeChar((Character) object); + } + else if (clazz == Integer.class) + { + writeInt((Integer) object); + } + else if (clazz == Long.class) + { + writeLong((Long) object); + } + else if (clazz == Float.class) + { + writeFloat((Float) object); + } + else if (clazz == Double.class) + { + writeDouble((Double) object); + } + else if (clazz == String.class) + { + writeString((String) object); + } + else + { + throw new MessageFormatException("Only primitives plus byte arrays and String are valid types"); + } + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java index 39b4e1e27b..fec0117a03 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java @@ -1,552 +1,552 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import java.util.Enumeration; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; - -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; - - -public final class JMSHeaderAdapter -{ - private final FieldTable _headers; - - public JMSHeaderAdapter(FieldTable headers) - { - _headers = headers; - } - - - public FieldTable getHeaders() - { - return _headers; - } - - public boolean getBoolean(String string) throws JMSException - { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); - - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public boolean getBoolean(AMQShortString string) throws JMSException - { - checkPropertyName(string); - Boolean b = getHeaders().getBoolean(string); - - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf((String) str); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public char getCharacter(String string) throws JMSException - { - checkPropertyName(string); - Character c = getHeaders().getCharacter(string); - - if (c == null) - { - if (getHeaders().isNullStringValue(string)) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(String string) throws JMSException - { - return getBytes(new AMQShortString(string)); - } - - public byte[] getBytes(AMQShortString string) throws JMSException - { - checkPropertyName(string); - - byte[] bs = getHeaders().getBytes(string); - - if (bs == null) - { - throw new MessageFormatException("getBytes can't use " + string + " item."); - } - else - { - return bs; - } - } - - public byte getByte(String string) throws JMSException - { - checkPropertyName(string); - Byte b = getHeaders().getByte(string); - if (b == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf((String) str); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; - } - - public short getShort(String string) throws JMSException - { - checkPropertyName(string); - Short s = getHeaders().getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } - - public int getInteger(String string) throws JMSException - { - checkPropertyName(string); - Integer i = getHeaders().getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; - } - - public long getLong(String string) throws JMSException - { - checkPropertyName(string); - Long l = getHeaders().getLong(string); - - if (l == null) - { - l = Long.valueOf(getInteger(string)); - } - - return l; - } - - public float getFloat(String string) throws JMSException - { - checkPropertyName(string); - Float f = getHeaders().getFloat(string); - - if (f == null) - { - if (getHeaders().containsKey(string)) - { - Object str = getHeaders().getObject(string); - - if (str == null || !(str instanceof String)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf((String) str); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - } - - public double getDouble(String string) throws JMSException - { - checkPropertyName(string); - Double d = getHeaders().getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; - } - - public String getString(String string) throws JMSException - { - checkPropertyName(string); - String s = getHeaders().getString(string); - - if (s == null) - { - if (getHeaders().containsKey(string)) - { - Object o = getHeaders().getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = String.valueOf(o); - } - } - }//else return s // null; - } - - return s; - } - - public Object getObject(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().getObject(string); - } - - public void setBoolean(AMQShortString string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setBoolean(String string, boolean b) throws JMSException - { - checkPropertyName(string); - getHeaders().setBoolean(string, b); - } - - public void setChar(String string, char c) throws JMSException - { - checkPropertyName(string); - getHeaders().setChar(string, c); - } - - public Object setBytes(AMQShortString string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes); - } - - public Object setBytes(String string, byte[] bytes, int start, int length) - { - checkPropertyName(string); - return getHeaders().setBytes(string, bytes, start, length); - } - - public void setByte(String string, byte b) throws JMSException - { - checkPropertyName(string); - getHeaders().setByte(string, b); - } - - public void setByte(AMQShortString string, byte b) throws JMSException - { - checkPropertyName(string); - getHeaders().setByte(string, b); - } - - - public void setShort(String string, short i) throws JMSException - { - checkPropertyName(string); - getHeaders().setShort(string, i); - } - - public void setInteger(String string, int i) throws JMSException - { - checkPropertyName(string); - getHeaders().setInteger(string, i); - } - - public void setInteger(AMQShortString string, int i) throws JMSException - { - checkPropertyName(string); - getHeaders().setInteger(string, i); - } - - public void setLong(String string, long l) throws JMSException - { - checkPropertyName(string); - getHeaders().setLong(string, l); - } - - public void setFloat(String string, float v) throws JMSException - { - checkPropertyName(string); - getHeaders().setFloat(string, v); - } - - public void setDouble(String string, double v) throws JMSException - { - checkPropertyName(string); - getHeaders().setDouble(string, v); - } - - public void setString(String string, String string1) throws JMSException - { - checkPropertyName(string); - getHeaders().setString(string, string1); - } - - public void setString(AMQShortString string, String string1) throws JMSException - { - checkPropertyName(string); - getHeaders().setString(string, string1); - } - - public void setObject(String string, Object object) throws JMSException - { - checkPropertyName(string); - try - { - getHeaders().setObject(string, object); - } - catch (AMQPInvalidClassException aice) - { - MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); - mfe.setLinkedException(aice); - throw mfe; - } - } - - public boolean itemExists(String string) throws JMSException - { - checkPropertyName(string); - return getHeaders().containsKey(string); - } - - public Enumeration getPropertyNames() - { - return getHeaders().getPropertyNames(); - } - - public void clear() - { - getHeaders().clear(); - } - - public boolean propertyExists(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); - } - - public boolean propertyExists(String propertyName) - { - checkPropertyName(propertyName); - return getHeaders().propertyExists(propertyName); - } - - public Object put(Object key, Object value) - { - checkPropertyName(key.toString()); - return getHeaders().setObject(key.toString(), value); - } - - public Object remove(AMQShortString propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); - } - - public Object remove(String propertyName) - { - checkPropertyName(propertyName); - return getHeaders().remove(propertyName); - } - - public boolean isEmpty() - { - return getHeaders().isEmpty(); - } - - public void writeToBuffer(ByteBuffer data) - { - getHeaders().writeToBuffer(data); - } - - public Enumeration getMapNames() - { - return getPropertyNames(); - } - - protected static void checkPropertyName(CharSequence propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if (propertyName.length() == 0) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - - checkIdentiferFormat(propertyName); - } - - protected static void checkIdentiferFormat(CharSequence propertyName) - { -// JMS requirements 3.5.1 Property Names -// Identifiers: -// - An identifier is an unlimited-length character sequence that must begin -// with a Java identifier start character; all following characters must be Java -// identifier part characters. An identifier start character is any character for -// which the method Character.isJavaIdentifierStart returns true. This includes -// '_' and '$'. An identifier part character is any character for which the -// method Character.isJavaIdentifierPart returns true. -// - Identifiers cannot be the names NULL, TRUE, or FALSE. -// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or -// ESCAPE. -// � Identifiers are either header field references or property references. The -// type of a property value in a message selector corresponds to the type -// used to set the property. If a property that does not exist in a message is -// referenced, its value is NULL. The semantics of evaluating NULL values -// in a selector are described in Section 3.8.1.2, �Null Values.� -// � The conversions that apply to the get methods for properties do not -// apply when a property is used in a message selector expression. For -// example, suppose you set a property as a string value, as in the -// following: -// myMessage.setStringProperty("NumberOfOrders", "2"); -// The following expression in a message selector would evaluate to false, -// because a string cannot be used in an arithmetic expression: -// "NumberOfOrders > 1" -// � Identifiers are case sensitive. -// � Message header field references are restricted to JMSDeliveryMode, -// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and -// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be -// null and if so are treated as a NULL value. - - if (Boolean.getBoolean("strict-jms")) - { - // JMS start character - if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); - } - - // JMS part character - int length = propertyName.length(); - for (int c = 1; c < length; c++) - { - if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); - } - } - - // JMS invalid names - if ((propertyName.equals("NULL") - || propertyName.equals("TRUE") - || propertyName.equals("FALSE") - || propertyName.equals("NOT") - || propertyName.equals("AND") - || propertyName.equals("OR") - || propertyName.equals("BETWEEN") - || propertyName.equals("LIKE") - || propertyName.equals("IN") - || propertyName.equals("IS") - || propertyName.equals("ESCAPE"))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); - } - } - - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import java.util.Enumeration; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + + +public final class JMSHeaderAdapter +{ + private final FieldTable _headers; + + public JMSHeaderAdapter(FieldTable headers) + { + _headers = headers; + } + + + public FieldTable getHeaders() + { + return _headers; + } + + public boolean getBoolean(String string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public boolean getBoolean(AMQShortString string) throws JMSException + { + checkPropertyName(string); + Boolean b = getHeaders().getBoolean(string); + + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf((String) str); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(String string) throws JMSException + { + checkPropertyName(string); + Character c = getHeaders().getCharacter(string); + + if (c == null) + { + if (getHeaders().isNullStringValue(string)) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(String string) throws JMSException + { + return getBytes(new AMQShortString(string)); + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + checkPropertyName(string); + + byte[] bs = getHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(String string) throws JMSException + { + checkPropertyName(string); + Byte b = getHeaders().getByte(string); + if (b == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf((String) str); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(String string) throws JMSException + { + checkPropertyName(string); + Short s = getHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(String string) throws JMSException + { + checkPropertyName(string); + Integer i = getHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(String string) throws JMSException + { + checkPropertyName(string); + Long l = getHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(String string) throws JMSException + { + checkPropertyName(string); + Float f = getHeaders().getFloat(string); + + if (f == null) + { + if (getHeaders().containsKey(string)) + { + Object str = getHeaders().getObject(string); + + if (str == null || !(str instanceof String)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf((String) str); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(String string) throws JMSException + { + checkPropertyName(string); + Double d = getHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public String getString(String string) throws JMSException + { + checkPropertyName(string); + String s = getHeaders().getString(string); + + if (s == null) + { + if (getHeaders().containsKey(string)) + { + Object o = getHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = String.valueOf(o); + } + } + }//else return s // null; + } + + return s; + } + + public Object getObject(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setBoolean(String string, boolean b) throws JMSException + { + checkPropertyName(string); + getHeaders().setBoolean(string, b); + } + + public void setChar(String string, char c) throws JMSException + { + checkPropertyName(string); + getHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes); + } + + public Object setBytes(String string, byte[] bytes, int start, int length) + { + checkPropertyName(string); + return getHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(String string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getHeaders().setByte(string, b); + } + + + public void setShort(String string, short i) throws JMSException + { + checkPropertyName(string); + getHeaders().setShort(string, i); + } + + public void setInteger(String string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getHeaders().setInteger(string, i); + } + + public void setLong(String string, long l) throws JMSException + { + checkPropertyName(string); + getHeaders().setLong(string, l); + } + + public void setFloat(String string, float v) throws JMSException + { + checkPropertyName(string); + getHeaders().setFloat(string, v); + } + + public void setDouble(String string, double v) throws JMSException + { + checkPropertyName(string); + getHeaders().setDouble(string, v); + } + + public void setString(String string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setString(AMQShortString string, String string1) throws JMSException + { + checkPropertyName(string); + getHeaders().setString(string, string1); + } + + public void setObject(String string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + MessageFormatException mfe = new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + mfe.setLinkedException(aice); + throw mfe; + } + } + + public boolean itemExists(String string) throws JMSException + { + checkPropertyName(string); + return getHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getHeaders().getPropertyNames(); + } + + public void clear() + { + getHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public boolean propertyExists(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + checkPropertyName(key.toString()); + return getHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public Object remove(String propertyName) + { + checkPropertyName(propertyName); + return getHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// � Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, �Null Values.� +// � The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// � Identifiers are case sensitive. +// � Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } +} diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java index 5185278eef..2c99b9a97b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java @@ -1,32 +1,32 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.state; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.AMQException; - -public class AMQMethodNotImplementedException extends AMQException -{ - public AMQMethodNotImplementedException(AMQMethodBody body) - { - super(null, "Unexpected Method Received: " + body.getClass().getName(), null); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.state; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; + +public class AMQMethodNotImplementedException extends AMQException +{ + public AMQMethodNotImplementedException(AMQMethodBody body) + { + super(null, "Unexpected Method Received: " + body.getClass().getName(), null); + } +} diff --git a/java/client/src/main/java/org/apache/qpid/jms/Message.java b/java/client/src/main/java/org/apache/qpid/jms/Message.java index 6752ee616f..e65f9ad2f4 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/Message.java +++ b/java/client/src/main/java/org/apache/qpid/jms/Message.java @@ -1,28 +1,28 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.jms; - -import javax.jms.JMSException; - -public interface Message extends javax.jms.Message -{ - public void acknowledgeThis() throws JMSException; -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +public interface Message extends javax.jms.Message +{ + public void acknowledgeThis() throws JMSException; +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java index a5279a195b..1738db7239 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java @@ -1,104 +1,104 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.unit.basic; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.testutil.QpidTestCase; - -import javax.jms.Session; -import javax.jms.QueueSession; -import javax.jms.Queue; -import javax.jms.QueueSender; -import javax.jms.TextMessage; -import javax.jms.InvalidDestinationException; - -public class InvalidDestinationTest extends QpidTestCase -{ - private AMQConnection _connection; - - protected void setUp() throws Exception - { - super.setUp(); - _connection = (AMQConnection) getConnection("guest", "guest"); - } - - protected void tearDown() throws Exception - { - _connection.close(); - super.tearDown(); - } - - - - public void testInvalidDestination() throws Exception - { - Queue invalidDestination = new AMQQueue("amq.direct","unknownQ"); - AMQQueue validDestination = new AMQQueue("amq.direct","knownQ"); - QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - - // This is the only easy way to create and bind a queue from the API :-( - queueSession.createConsumer(validDestination); - - QueueSender sender = queueSession.createSender(invalidDestination); - TextMessage msg = queueSession.createTextMessage("Hello"); - try - { - sender.send(msg); - fail("Expected InvalidDestinationException"); - } - catch (InvalidDestinationException ex) - { - // pass - } - sender.close(); - - sender = queueSession.createSender(null); - invalidDestination = new AMQQueue("amq.direct","unknownQ"); - - try - { - sender.send(invalidDestination,msg); - fail("Expected InvalidDestinationException"); - } - catch (InvalidDestinationException ex) - { - // pass - } - sender.send(validDestination,msg); - sender.close(); - validDestination = new AMQQueue("amq.direct","knownQ"); - sender = queueSession.createSender(validDestination); - sender.send(msg); - - - - - } - - - public static junit.framework.Test suite() - { - - return new junit.framework.TestSuite(InvalidDestinationTest.class); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.basic; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.testutil.QpidTestCase; + +import javax.jms.Session; +import javax.jms.QueueSession; +import javax.jms.Queue; +import javax.jms.QueueSender; +import javax.jms.TextMessage; +import javax.jms.InvalidDestinationException; + +public class InvalidDestinationTest extends QpidTestCase +{ + private AMQConnection _connection; + + protected void setUp() throws Exception + { + super.setUp(); + _connection = (AMQConnection) getConnection("guest", "guest"); + } + + protected void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + + + public void testInvalidDestination() throws Exception + { + Queue invalidDestination = new AMQQueue("amq.direct","unknownQ"); + AMQQueue validDestination = new AMQQueue("amq.direct","knownQ"); + QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + // This is the only easy way to create and bind a queue from the API :-( + queueSession.createConsumer(validDestination); + + QueueSender sender = queueSession.createSender(invalidDestination); + TextMessage msg = queueSession.createTextMessage("Hello"); + try + { + sender.send(msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.close(); + + sender = queueSession.createSender(null); + invalidDestination = new AMQQueue("amq.direct","unknownQ"); + + try + { + sender.send(invalidDestination,msg); + fail("Expected InvalidDestinationException"); + } + catch (InvalidDestinationException ex) + { + // pass + } + sender.send(validDestination,msg); + sender.close(); + validDestination = new AMQQueue("amq.direct","knownQ"); + sender = queueSession.createSender(validDestination); + sender.send(msg); + + + + + } + + + public static junit.framework.Test suite() + { + + return new junit.framework.TestSuite(InvalidDestinationTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java index 34197f2608..46b99fac8d 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -1,221 +1,221 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.unit.client.temporaryqueue; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TextMessage; -import junit.framework.Assert; - -import org.apache.qpid.testutil.QpidTestCase; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.transport.TransportConnection; - -import java.util.List; -import java.util.LinkedList; - -public class TemporaryQueueTest extends QpidTestCase -{ - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - protected Connection createConnection() throws Exception - { - return getConnection("guest", "guest"); - } - - public void testTempoaryQueue() throws Exception - { - Connection conn = createConnection(); - Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); - assertNotNull(queue); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - conn.start(); - producer.send(session.createTextMessage("hello")); - TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); - assertEquals("hello", tm.getText()); - - try - { - queue.delete(); - fail("Expected JMSException : should not be able to delete while there are active consumers"); - } - catch (JMSException je) - { - ; //pass - } - - consumer.close(); - - try - { - queue.delete(); - } - catch (JMSException je) - { - fail("Unexpected Exception: " + je.getMessage()); - } - - conn.close(); - } - - public void tUniqueness() throws Exception - { - int numProcs = Runtime.getRuntime().availableProcessors(); - final int threadsProc = 5; - - runUniqueness(1, 10); - runUniqueness(numProcs * threadsProc, 10); - runUniqueness(numProcs * threadsProc, 100); - runUniqueness(numProcs * threadsProc, 500); - } - - void runUniqueness(int makers, int queues) throws Exception - { - Connection connection = createConnection(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - List tqList = new LinkedList(); - - //Create Makers - for (int m = 0; m < makers; m++) - { - tqList.add(new TempQueueMaker(session, queues)); - } - - - List threadList = new LinkedList(); - - //Create Makers - for (TempQueueMaker maker : tqList) - { - threadList.add(new Thread(maker)); - } - - //Start threads - for (Thread thread : threadList) - { - thread.start(); - } - - // Join Threads - for (Thread thread : threadList) - { - try - { - thread.join(); - } - catch (InterruptedException e) - { - fail("Couldn't correctly join threads"); - } - } - - - List list = new LinkedList(); - - // Test values - for (TempQueueMaker maker : tqList) - { - check(maker, list); - } - - Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); - - connection.close(); - } - - private void check(TempQueueMaker tq, List list) - { - for (AMQQueue q : tq.getList()) - { - if (list.contains(q)) - { - fail(q + " already exists."); - } - else - { - list.add(q); - } - } - } - - - class TempQueueMaker implements Runnable - { - List _queues; - Session _session; - private int _count; - - - TempQueueMaker(Session session, int queues) throws JMSException - { - _queues = new LinkedList(); - - _count = queues; - - _session = session; - } - - public void run() - { - int i = 0; - try - { - for (; i < _count; i++) - { - _queues.add((AMQQueue) _session.createTemporaryQueue()); - } - } - catch (JMSException jmse) - { - //stop - } - } - - List getList() - { - return _queues; - } - } - - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TemporaryQueueTest.class); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.client.temporaryqueue; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import junit.framework.Assert; + +import org.apache.qpid.testutil.QpidTestCase; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.transport.TransportConnection; + +import java.util.List; +import java.util.LinkedList; + +public class TemporaryQueueTest extends QpidTestCase +{ + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + protected Connection createConnection() throws Exception + { + return getConnection("guest", "guest"); + } + + public void testTempoaryQueue() throws Exception + { + Connection conn = createConnection(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue queue = session.createTemporaryQueue(); + assertNotNull(queue); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + conn.start(); + producer.send(session.createTextMessage("hello")); + TextMessage tm = (TextMessage) consumer.receive(2000); + assertNotNull(tm); + assertEquals("hello", tm.getText()); + + try + { + queue.delete(); + fail("Expected JMSException : should not be able to delete while there are active consumers"); + } + catch (JMSException je) + { + ; //pass + } + + consumer.close(); + + try + { + queue.delete(); + } + catch (JMSException je) + { + fail("Unexpected Exception: " + je.getMessage()); + } + + conn.close(); + } + + public void tUniqueness() throws Exception + { + int numProcs = Runtime.getRuntime().availableProcessors(); + final int threadsProc = 5; + + runUniqueness(1, 10); + runUniqueness(numProcs * threadsProc, 10); + runUniqueness(numProcs * threadsProc, 100); + runUniqueness(numProcs * threadsProc, 500); + } + + void runUniqueness(int makers, int queues) throws Exception + { + Connection connection = createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + List tqList = new LinkedList(); + + //Create Makers + for (int m = 0; m < makers; m++) + { + tqList.add(new TempQueueMaker(session, queues)); + } + + + List threadList = new LinkedList(); + + //Create Makers + for (TempQueueMaker maker : tqList) + { + threadList.add(new Thread(maker)); + } + + //Start threads + for (Thread thread : threadList) + { + thread.start(); + } + + // Join Threads + for (Thread thread : threadList) + { + try + { + thread.join(); + } + catch (InterruptedException e) + { + fail("Couldn't correctly join threads"); + } + } + + + List list = new LinkedList(); + + // Test values + for (TempQueueMaker maker : tqList) + { + check(maker, list); + } + + Assert.assertEquals("Not enough queues made.", makers * queues, list.size()); + + connection.close(); + } + + private void check(TempQueueMaker tq, List list) + { + for (AMQQueue q : tq.getList()) + { + if (list.contains(q)) + { + fail(q + " already exists."); + } + else + { + list.add(q); + } + } + } + + + class TempQueueMaker implements Runnable + { + List _queues; + Session _session; + private int _count; + + + TempQueueMaker(Session session, int queues) throws JMSException + { + _queues = new LinkedList(); + + _count = queues; + + _session = session; + } + + public void run() + { + int i = 0; + try + { + for (; i < _count; i++) + { + _queues.add((AMQQueue) _session.createTemporaryQueue()); + } + } + catch (JMSException jmse) + { + //stop + } + } + + List getList() + { + return _queues; + } + } + + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(TemporaryQueueTest.class); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java index d25986d991..54b2ee95f4 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java @@ -1,144 +1,144 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.close; - -import junit.framework.Assert; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.testutil.QpidTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.junit.concurrency.TestRunnable; -import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.Session; - -/** - * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method. - * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent - * before closing the connection. - * - *

CRC Card
Responsibilities Collaborations
Check that - * closing a connection whilst handling a message, blocks till completion of the handler.
- */ -public class CloseBeforeAckTest extends QpidTestCase -{ - private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); - - Connection connection; - Session session; - public static final String TEST_QUEUE_NAME = "TestQueue"; - private int TEST_COUNT = 25; - - class TestThread1 extends TestRunnable implements MessageListener - { - public void runWithExceptions() throws Exception - { - // Set this up to listen for message on the test session. - session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this); - } - - public void onMessage(Message message) - { - // Give thread 2 permission to close the session. - allow(new int[] { 1 }); - - // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete. - waitFor(new int[] { 1 }, true); - } - } - - TestThread1 testThread1 = new TestThread1(); - - TestRunnable testThread2 = - new TestRunnable() - { - public void runWithExceptions() throws Exception - { - // Send a message to be picked up by thread 1. - session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME), - session.createTextMessage("Hi there thread 1!")); - - // Wait for thread 1 to pick up the message and give permission to continue. - waitFor(new int[] { 0 }, false); - - // Close the connection. - session.close(); - - // Allow thread 1 to continue to completion, if it is erronously still waiting. - allow(new int[] { 1 }); - } - }; - - public void testCloseBeforeAutoAck_QPID_397() throws Exception - { - // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks - // message at the end of the onMessage method, after a close has been sent. - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ThreadTestCoordinator tt = new ThreadTestCoordinator(2); - - tt.addTestThread(testThread1, 0); - tt.addTestThread(testThread2, 1); - tt.setDeadlockTimeout(500); - tt.run(); - - String errorMessage = tt.joinAndRetrieveMessages(); - - // Print any error messages or exceptions. - log.debug(errorMessage); - - if (!tt.getExceptions().isEmpty()) - { - for (Exception e : tt.getExceptions()) - { - log.debug("Exception thrown during test thread: ", e); - } - } - - Assert.assertTrue(errorMessage, "".equals(errorMessage)); - } - - public void closeBeforeAutoAckManyTimes() throws Exception - { - for (int i = 0; i < TEST_COUNT; i++) - { - testCloseBeforeAutoAck_QPID_397(); - } - } - - protected void setUp() throws Exception - { - super.setUp(); - connection = getConnection("guest", "guest"); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.unit.close; + +import junit.framework.Assert; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.junit.concurrency.TestRunnable; +import org.apache.qpid.junit.concurrency.ThreadTestCoordinator; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +/** + * This test forces the situation where a session is closed whilst a message consumer is still in its onMessage method. + * Running in AUTO_ACK mode, the close call ought to wait until the onMessage method completes, and the ack is sent + * before closing the connection. + * + *

CRC Card
Responsibilities Collaborations
Check that + * closing a connection whilst handling a message, blocks till completion of the handler.
+ */ +public class CloseBeforeAckTest extends QpidTestCase +{ + private static final Logger log = LoggerFactory.getLogger(CloseBeforeAckTest.class); + + Connection connection; + Session session; + public static final String TEST_QUEUE_NAME = "TestQueue"; + private int TEST_COUNT = 25; + + class TestThread1 extends TestRunnable implements MessageListener + { + public void runWithExceptions() throws Exception + { + // Set this up to listen for message on the test session. + session.createConsumer(session.createQueue(TEST_QUEUE_NAME)).setMessageListener(this); + } + + public void onMessage(Message message) + { + // Give thread 2 permission to close the session. + allow(new int[] { 1 }); + + // Wait until thread 2 has closed the connection, or is blocked waiting for this to complete. + waitFor(new int[] { 1 }, true); + } + } + + TestThread1 testThread1 = new TestThread1(); + + TestRunnable testThread2 = + new TestRunnable() + { + public void runWithExceptions() throws Exception + { + // Send a message to be picked up by thread 1. + session.createProducer(null).send(session.createQueue(TEST_QUEUE_NAME), + session.createTextMessage("Hi there thread 1!")); + + // Wait for thread 1 to pick up the message and give permission to continue. + waitFor(new int[] { 0 }, false); + + // Close the connection. + session.close(); + + // Allow thread 1 to continue to completion, if it is erronously still waiting. + allow(new int[] { 1 }); + } + }; + + public void testCloseBeforeAutoAck_QPID_397() throws Exception + { + // Create a session in auto acknowledge mode. This problem shows up in auto acknowledge if the client acks + // message at the end of the onMessage method, after a close has been sent. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ThreadTestCoordinator tt = new ThreadTestCoordinator(2); + + tt.addTestThread(testThread1, 0); + tt.addTestThread(testThread2, 1); + tt.setDeadlockTimeout(500); + tt.run(); + + String errorMessage = tt.joinAndRetrieveMessages(); + + // Print any error messages or exceptions. + log.debug(errorMessage); + + if (!tt.getExceptions().isEmpty()) + { + for (Exception e : tt.getExceptions()) + { + log.debug("Exception thrown during test thread: ", e); + } + } + + Assert.assertTrue(errorMessage, "".equals(errorMessage)); + } + + public void closeBeforeAutoAckManyTimes() throws Exception + { + for (int i = 0; i < TEST_COUNT; i++) + { + testCloseBeforeAutoAck_QPID_397(); + } + } + + protected void setUp() throws Exception + { + super.setUp(); + connection = getConnection("guest", "guest"); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } +} diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java index 0b9d0bdc2d..131cbd5f68 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java @@ -1,89 +1,89 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.test.unit.message; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.testutil.QpidTestCase; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; - -/** - * @author Apache Software Foundation - */ -public class JMSDestinationTest extends QpidTestCase -{ - private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class); - - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - - public void testJMSDestination() throws Exception - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = - new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, - true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - - Connection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - TextMessage sentMsg = producerSession.createTextMessage("hello"); - assertNull(sentMsg.getJMSDestination()); - - producer.send(sentMsg); - - assertEquals(sentMsg.getJMSDestination(), queue); - - con2.close(); - - con.start(); - - TextMessage rm = (TextMessage) consumer.receive(); - assertNotNull(rm); - - assertEquals(rm.getJMSDestination(), queue); - con.close(); - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.test.unit.message; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testutil.QpidTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * @author Apache Software Foundation + */ +public class JMSDestinationTest extends QpidTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSDestinationTest.class); + + + protected void setUp() throws Exception + { + super.setUp(); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + } + + public void testJMSDestination() throws Exception + { + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = + new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, + true); + MessageConsumer consumer = consumerSession.createConsumer(queue); + + Connection con2 = (AMQConnection) getConnection("guest", "guest"); + Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + TextMessage sentMsg = producerSession.createTextMessage("hello"); + assertNull(sentMsg.getJMSDestination()); + + producer.send(sentMsg); + + assertEquals(sentMsg.getJMSDestination(), queue); + + con2.close(); + + con.start(); + + TextMessage rm = (TextMessage) consumer.receive(); + assertNotNull(rm); + + assertEquals(rm.getJMSDestination(), queue); + con.close(); + } + +} -- cgit v1.2.1