summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-01 18:09:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-01 18:09:05 +0000
commita07d2fb8b27654574695dbf1c8d077063bc04e41 (patch)
tree509cdc076116a22cb5e93ddb85a2ac52b922534f /qpid/java
parent1eb7a684fa571034d7242c696ea08c407acab897 (diff)
downloadqpid-python-a07d2fb8b27654574695dbf1c8d077063bc04e41.tar.gz
QPID-5439 : [AMQP 1.0 JMS Client] timeout rather than wait indefinitely when requiring response from server
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1554662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java34
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java10
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java5
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java51
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java8
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java49
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java177
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java44
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java18
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java26
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java18
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java13
12 files changed, 343 insertions, 110 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 976ae10c56..f3ae849dac 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -370,21 +370,47 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
_lock.notifyAll();
}
-
+
+ List<JMSException> errors = new ArrayList<JMSException>();
+
if (sessions != null)
{
for(SessionImpl session : sessions)
{
- session.close();
+ try
+ {
+ session.close();
+ }
+ catch(JMSException e)
+ {
+ errors.add(e);
+ }
}
for(CloseTask task : closeTasks)
{
task.onClose();
}
- if(closeConnection) {
- _conn.close();
+ if(closeConnection)
+ {
+ try
+ {
+ _conn.close();
+ }
+ catch (ConnectionErrorException e)
+ {
+ final JMSException jmsException = new JMSException("Error while closing connection: " + e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
}
+
+ if(!errors.isEmpty())
+ {
+ final JMSException jmsException = new JMSException("Error while closing connection: " + errors.get(0).getMessage());
+ jmsException.setLinkedException(errors.get(0));
+ throw jmsException;
+ }
}
private void checkClosed() throws IllegalStateException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index cee4f4f6f2..94a0eebbb4 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -34,6 +34,8 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
@@ -221,7 +223,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
}
catch (Sender.SenderClosingException e)
{
- final JMSException jmsException = new JMSException("error closing");
+ final JMSException jmsException = new JMSException("Error closing producer: " + e.getMessage());
jmsException.setLinkedException(e);
throw jmsException;
}
@@ -315,6 +317,12 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
jmsException.setLinkedException(e);
throw jmsException;
}
+ catch (TimeoutException e)
+ {
+ JMSException jmsException = new JMSException("Timed out while waiting to get credit to send");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
if(_syncPublish && !action.wasAccepted(_syncPublishTimeout))
{
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
index 1e4bcfc7d7..e29323eb80 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.qpid.amqp_1_0.type.Section;
@@ -280,7 +282,8 @@ public class Respond extends Util
}
}
- private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException
+ private void respond(Message m)
+ throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException, TimeoutException
{
List<Section> sections = m.getPayload();
String replyTo = null;
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
index 6074f9b868..fc0ff427a2 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.Principal;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,11 +37,13 @@ import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
@@ -357,22 +360,16 @@ public class Connection implements SocketExceptionHandler
return _conn;
}
- public void awaitOpen()
+ public void awaitOpen() throws TimeoutException, InterruptedException
{
- synchronized(getEndpoint().getLock())
+ getEndpoint().waitUntil(new Predicate()
{
- while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
+ @Override
+ public boolean isSatisfied()
{
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ return getEndpoint().isOpen() || getEndpoint().isClosed();
}
- }
+ });
}
@@ -417,24 +414,34 @@ public class Connection implements SocketExceptionHandler
}
}
- public void close()
+ public void close() throws ConnectionErrorException
{
_conn.close();
- synchronized (_conn.getLock())
+ try
{
- while(!_conn.closedForInput())
+ _conn.waitUntil(new Predicate()
{
- try
+ @Override
+ public boolean isSatisfied()
{
- _conn.getLock().wait();
- }
- catch (InterruptedException e)
- {
-
+ return _conn.closedForInput();
}
- }
+ });
}
+ catch (InterruptedException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Interrupted while waiting for connection closure");
+ }
+ catch (TimeoutException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Timed out while waiting for connection closure");
+ }
+ if(_conn.getRemoteError() != null)
+ {
+ throw new ConnectionErrorException(_conn.getRemoteError());
+ }
+
}
/**
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
index baf3de8991..302060776a 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.amqp_1_0.client;
+import org.apache.qpid.amqp_1_0.type.ErrorCondition;
import org.apache.qpid.amqp_1_0.type.transport.Error;
public class ConnectionErrorException extends ConnectionException
{
protected final Error _remoteError;
+ public ConnectionErrorException(ErrorCondition condition,final String description)
+ {
+ this(new Error(condition, description));
+ }
+
public ConnectionErrorException(Error remoteError)
{
- super();
+ super(remoteError.getDescription());
_remoteError = remoteError;
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index 5175d1d847..d76899a88b 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
@@ -38,6 +39,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
public class Receiver implements DeliveryStateHandler
{
@@ -137,36 +139,47 @@ public class Receiver implements DeliveryStateHandler
_endpoint.setLocalUnsettled(unsettled);
_endpoint.attach();
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isAttached() && !_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.isAttached() || _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for attach");
+ }
+ catch (InterruptedException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for attach");
}
if(_endpoint.getSource() == null)
{
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for detach following failed attach");
+ }
+ catch (InterruptedException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted whil waiting for detach following failed attach");
}
throw new ConnectionErrorException(getError());
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index e891c5cbe7..afbabcf363 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
@@ -39,10 +40,15 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
+ private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l;
+ private static final long DEFAULT_CREDIT_TIMEOUT = 30000l;
+
private SendingLinkEndpoint _endpoint;
private int _id;
private Session _session;
@@ -150,17 +156,26 @@ public class Sender implements DeliveryStateHandler
synchronized(_endpoint.getLock())
{
- while(!(_endpoint.isAttached() || _endpoint.isDetached()))
+ try
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
- {
- throw new SenderCreationException(e);
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.isAttached() || _endpoint.isDetached();
+ }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new SenderCreationException(e);
}
+ catch (InterruptedException e)
+ {
+ throw new SenderCreationException(e);
+ }
+
if (session.getEndpoint().isEnded())
{
throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress());
@@ -225,22 +240,22 @@ public class Sender implements DeliveryStateHandler
return _endpoint.getTarget();
}
- public void send(Message message) throws LinkDetachedException
+ public void send(Message message) throws LinkDetachedException, TimeoutException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action) throws LinkDetachedException
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn) throws LinkDetachedException
+ public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException
{
List<Section> sections = message.getPayload();
@@ -290,19 +305,26 @@ public class Sender implements DeliveryStateHandler
xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED);
}
final Object lock = _endpoint.getLock();
+
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
+
+ try
{
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.hasCreditToSend() || _endpoint.isDetached();
+ }
+ }, getCreditTimeout());
}
+ catch (InterruptedException e)
+ {
+ throw new TimeoutException("Interrupted while waiting for credit");
+ }
+
if(_endpoint.isDetached())
{
throw new LinkDetachedException(_error);
@@ -312,27 +334,24 @@ public class Sender implements DeliveryStateHandler
_outcomeActions.put(message.getDeliveryTag(), action);
}
_endpoint.transfer(xfr);
- //TODO - rationalise sending of flows
- // _endpoint.sendFlow();
}
if(_windowSize != 0)
{
- synchronized(lock)
+ try
{
-
-
- while(_endpoint.getUnsettledCount() >= _windowSize)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
+ _endpoint.waitUntil(new Predicate()
+ {
+ @Override
+ public boolean isSatisfied()
+ {
+ return _endpoint.getUnsettledCount() < _windowSize;
+ }
+ }, getUnsettledTimeout());
+ }
+ catch (InterruptedException e)
+ {
+ throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending");
}
}
@@ -340,26 +359,37 @@ public class Sender implements DeliveryStateHandler
}
+ private long getCreditTimeout()
+ {
+ return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout();
+ }
+
public void close() throws SenderClosingException
{
+ boolean unsettledDeliveries = false;
if(_windowSize != 0)
{
- synchronized(_endpoint.getLock())
- {
-
+ long timeout = getUnsettledTimeout();
- while(_endpoint.getUnsettledCount() > 0)
+ try
+ {
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.getUnsettledCount() == 0;
}
- }
+ }, timeout);
+ }
+ catch (InterruptedException e)
+ {
+ unsettledDeliveries = true;
+ }
+ catch (TimeoutException e)
+ {
+ unsettledDeliveries = true;
}
}
@@ -368,20 +398,41 @@ public class Sender implements DeliveryStateHandler
_endpoint.detach();
_closed = true;
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- throw new SenderClosingException(e);
+ return _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new SenderClosingException("Timed out attempting to detach link", e);
}
+ catch (InterruptedException e)
+ {
+ throw new SenderClosingException("Interrupted while attempting to detach link", e);
+ }
+ if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0)
+ {
+ throw new SenderClosingException("Some messages may not have been received by the recipient");
+ }
+ }
+
+ private long getUnsettledTimeout()
+ {
+ long timeout = _endpoint.getSyncTimeout();
+
+ // give a generous timeout where there are unsettled messages
+ if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER)
+ {
+ timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER;
+ }
+ return timeout;
}
public boolean isClosed()
@@ -468,10 +519,20 @@ public class Sender implements DeliveryStateHandler
public class SenderClosingException extends Exception
{
+ public SenderClosingException(final String message, final Throwable cause)
+ {
+ super(message, cause);
+ }
+
public SenderClosingException(Throwable e)
{
super(e);
}
+
+ public SenderClosingException(final String message)
+ {
+ super(message);
+ }
}
public static interface OutcomeAction
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index a3c4ad7b5a..1c80668856 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -40,10 +40,8 @@ import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import javax.security.sasl.SaslServerFactory;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -51,7 +49,7 @@ import java.nio.charset.Charset;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Enumeration;
+import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -71,6 +69,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
+ private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l);
private ConnectionState _state = ConnectionState.UNOPENED;
@@ -122,6 +121,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private Error _remoteError;
private Map _properties;
+ private long _syncTimeout = DEFAULT_SYNC_TIMEOUT;
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
@@ -1054,4 +1054,42 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
{
_channelMax = channelMax;
}
+
+ public long getSyncTimeout()
+ {
+ return _syncTimeout;
+ }
+
+ public void setSyncTimeout(final long syncTimeout)
+ {
+ _syncTimeout = syncTimeout;
+ }
+
+ public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException
+ {
+ waitUntil(predicate, _syncTimeout);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException
+ {
+ long endTime = System.currentTimeMillis() + timeout;
+
+ synchronized(getLock())
+ {
+ while(!predicate.isSatisfied())
+ {
+ getLock().wait(timeout);
+
+ if(!predicate.isSatisfied())
+ {
+ timeout = endTime - System.currentTimeMillis();
+ if(timeout <= 0l)
+ {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
+
+ }
}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
index 32fffd545a..301dd0695a 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
@@ -28,6 +28,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeoutException;
public abstract class LinkEndpoint<T extends LinkEventListener>
{
@@ -324,6 +325,23 @@ public abstract class LinkEndpoint<T extends LinkEventListener>
return _session.getLock();
}
+
+ public long getSyncTimeout()
+ {
+ return _session.getSyncTimeout();
+ }
+
+ public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+ {
+ _session.waitUntil(predicate);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+ {
+ _session.waitUntil(predicate, timeout);
+ }
+
+
public void attach()
{
synchronized(getLock())
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java
new file mode 100644
index 0000000000..3acd576527
--- /dev/null
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+public interface Predicate
+{
+ boolean isSatisfied();
+}
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
index f7a3cd3800..34ca851978 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
@@ -35,6 +35,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeoutException;
public class SessionEndpoint
{
@@ -618,6 +619,23 @@ catch(IllegalArgumentException e)
return _connection.getLock();
}
+
+ public long getSyncTimeout()
+ {
+ return _connection.getSyncTimeout();
+ }
+
+ public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException
+ {
+ _connection.waitUntil(predicate);
+ }
+
+ public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException
+ {
+ _connection.waitUntil(predicate, timeout);
+ }
+
+
public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name,
String targetAddr,
String sourceAddr,
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
index 6e1af84cc9..11319f738b 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
@@ -31,8 +31,7 @@ import java.util.Map;
import org.apache.qpid.amqp_1_0.type.*;
public class Error
- {
-
+{
private ErrorCondition _condition;
@@ -40,6 +39,16 @@ public class Error
private Map _info;
+ public Error()
+ {
+ }
+
+ public Error(final ErrorCondition condition, final String description)
+ {
+ _condition = condition;
+ _description = description;
+ }
+
public ErrorCondition getCondition()
{
return _condition;