diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-01 18:09:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-01 18:09:05 +0000 |
| commit | a07d2fb8b27654574695dbf1c8d077063bc04e41 (patch) | |
| tree | 509cdc076116a22cb5e93ddb85a2ac52b922534f /qpid/java/amqp-1-0-common | |
| parent | 1eb7a684fa571034d7242c696ea08c407acab897 (diff) | |
| download | qpid-python-a07d2fb8b27654574695dbf1c8d077063bc04e41.tar.gz | |
QPID-5439 : [AMQP 1.0 JMS Client] timeout rather than wait indefinitely when requiring response from server
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1554662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/amqp-1-0-common')
5 files changed, 114 insertions, 5 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index a3c4ad7b5a..1c80668856 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -40,10 +40,8 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslServerFactory; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -51,7 +49,7 @@ import java.nio.charset.Charset; import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; -import java.util.Enumeration; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,6 +69,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue(); private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15); + private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l); private ConnectionState _state = ConnectionState.UNOPENED; @@ -122,6 +121,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private Error _remoteError; private Map _properties; + private long _syncTimeout = DEFAULT_SYNC_TIMEOUT; public ConnectionEndpoint(Container container, SaslServerProvider cbs) { @@ -1054,4 +1054,42 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { _channelMax = channelMax; } + + public long getSyncTimeout() + { + return _syncTimeout; + } + + public void setSyncTimeout(final long syncTimeout) + { + _syncTimeout = syncTimeout; + } + + public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException + { + waitUntil(predicate, _syncTimeout); + } + + public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException + { + long endTime = System.currentTimeMillis() + timeout; + + synchronized(getLock()) + { + while(!predicate.isSatisfied()) + { + getLock().wait(timeout); + + if(!predicate.isSatisfied()) + { + timeout = endTime - System.currentTimeMillis(); + if(timeout <= 0l) + { + throw new TimeoutException(); + } + } + } + } + + } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 32fffd545a..301dd0695a 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -28,6 +28,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeoutException; public abstract class LinkEndpoint<T extends LinkEventListener> { @@ -324,6 +325,23 @@ public abstract class LinkEndpoint<T extends LinkEventListener> return _session.getLock(); } + + public long getSyncTimeout() + { + return _session.getSyncTimeout(); + } + + public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException + { + _session.waitUntil(predicate); + } + + public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException + { + _session.waitUntil(predicate, timeout); + } + + public void attach() { synchronized(getLock()) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java new file mode 100644 index 0000000000..3acd576527 --- /dev/null +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.transport; + +public interface Predicate +{ + boolean isSatisfied(); +} diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index f7a3cd3800..34ca851978 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -35,6 +35,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeoutException; public class SessionEndpoint { @@ -618,6 +619,23 @@ catch(IllegalArgumentException e) return _connection.getLock(); } + + public long getSyncTimeout() + { + return _connection.getSyncTimeout(); + } + + public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException + { + _connection.waitUntil(predicate); + } + + public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException + { + _connection.waitUntil(predicate, timeout); + } + + public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name, String targetAddr, String sourceAddr, diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java index 6e1af84cc9..11319f738b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java @@ -31,8 +31,7 @@ import java.util.Map; import org.apache.qpid.amqp_1_0.type.*; public class Error - { - +{ private ErrorCondition _condition; @@ -40,6 +39,16 @@ public class Error private Map _info; + public Error() + { + } + + public Error(final ErrorCondition condition, final String description) + { + _condition = condition; + _description = description; + } + public ErrorCondition getCondition() { return _condition; |
