diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-01 18:09:05 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-01 18:09:05 +0000 |
| commit | a07d2fb8b27654574695dbf1c8d077063bc04e41 (patch) | |
| tree | 509cdc076116a22cb5e93ddb85a2ac52b922534f /qpid/java | |
| parent | 1eb7a684fa571034d7242c696ea08c407acab897 (diff) | |
| download | qpid-python-a07d2fb8b27654574695dbf1c8d077063bc04e41.tar.gz | |
QPID-5439 : [AMQP 1.0 JMS Client] timeout rather than wait indefinitely when requiring response from server
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1554662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
12 files changed, 343 insertions, 110 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 976ae10c56..f3ae849dac 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -370,21 +370,47 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _lock.notifyAll(); } - + + List<JMSException> errors = new ArrayList<JMSException>(); + if (sessions != null) { for(SessionImpl session : sessions) { - session.close(); + try + { + session.close(); + } + catch(JMSException e) + { + errors.add(e); + } } for(CloseTask task : closeTasks) { task.onClose(); } - if(closeConnection) { - _conn.close(); + if(closeConnection) + { + try + { + _conn.close(); + } + catch (ConnectionErrorException e) + { + final JMSException jmsException = new JMSException("Error while closing connection: " + e.getMessage()); + jmsException.setLinkedException(e); + throw jmsException; + } } } + + if(!errors.isEmpty()) + { + final JMSException jmsException = new JMSException("Error while closing connection: " + errors.get(0).getMessage()); + jmsException.setLinkedException(errors.get(0)); + throw jmsException; + } } private void checkClosed() throws IllegalStateException diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index cee4f4f6f2..94a0eebbb4 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -34,6 +34,8 @@ import javax.jms.*; import javax.jms.IllegalStateException; import javax.jms.Message; import java.util.UUID; +import java.util.concurrent.TimeoutException; + import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Rejected; import org.apache.qpid.amqp_1_0.type.messaging.Source; @@ -221,7 +223,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP } catch (Sender.SenderClosingException e) { - final JMSException jmsException = new JMSException("error closing"); + final JMSException jmsException = new JMSException("Error closing producer: " + e.getMessage()); jmsException.setLinkedException(e); throw jmsException; } @@ -315,6 +317,12 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP jmsException.setLinkedException(e); throw jmsException; } + catch (TimeoutException e) + { + JMSException jmsException = new JMSException("Timed out while waiting to get credit to send"); + jmsException.setLinkedException(e); + throw jmsException; + } if(_syncPublish && !action.wasAccepted(_syncPublishTimeout)) { diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java index 1e4bcfc7d7..e29323eb80 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Random; +import java.util.concurrent.TimeoutException; + import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.qpid.amqp_1_0.type.Section; @@ -280,7 +282,8 @@ public class Respond extends Util } } - private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException + private void respond(Message m) + throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException, TimeoutException { List<Section> sections = m.getPayload(); String replyTo = null; diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java index 6074f9b868..fc0ff427a2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.security.Principal; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,11 +37,13 @@ import org.apache.qpid.amqp_1_0.framing.ConnectionHandler; import org.apache.qpid.amqp_1_0.transport.AMQPTransport; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; +import org.apache.qpid.amqp_1_0.transport.Predicate; import org.apache.qpid.amqp_1_0.transport.StateChangeListener; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.SaslFrameBody; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.ConnectionError; import org.apache.qpid.amqp_1_0.type.transport.Error; @@ -357,22 +360,16 @@ public class Connection implements SocketExceptionHandler return _conn; } - public void awaitOpen() + public void awaitOpen() throws TimeoutException, InterruptedException { - synchronized(getEndpoint().getLock()) + getEndpoint().waitUntil(new Predicate() { - while(!getEndpoint().isOpen() && !getEndpoint().isClosed()) + @Override + public boolean isSatisfied() { - try - { - getEndpoint().getLock().wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } + return getEndpoint().isOpen() || getEndpoint().isClosed(); } - } + }); } @@ -417,24 +414,34 @@ public class Connection implements SocketExceptionHandler } } - public void close() + public void close() throws ConnectionErrorException { _conn.close(); - synchronized (_conn.getLock()) + try { - while(!_conn.closedForInput()) + _conn.waitUntil(new Predicate() { - try + @Override + public boolean isSatisfied() { - _conn.getLock().wait(); - } - catch (InterruptedException e) - { - + return _conn.closedForInput(); } - } + }); } + catch (InterruptedException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Interrupted while waiting for connection closure"); + } + catch (TimeoutException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Timed out while waiting for connection closure"); + } + if(_conn.getRemoteError() != null) + { + throw new ConnectionErrorException(_conn.getRemoteError()); + } + } /** diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java index baf3de8991..302060776a 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java @@ -20,15 +20,21 @@ */ package org.apache.qpid.amqp_1_0.client; +import org.apache.qpid.amqp_1_0.type.ErrorCondition; import org.apache.qpid.amqp_1_0.type.transport.Error; public class ConnectionErrorException extends ConnectionException { protected final Error _remoteError; + public ConnectionErrorException(ErrorCondition condition,final String description) + { + this(new Error(condition, description)); + } + public ConnectionErrorException(Error remoteError) { - super(); + super(remoteError.getDescription()); _remoteError = remoteError; } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 5175d1d847..d76899a88b 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.client; import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.Predicate; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; @@ -38,6 +39,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; public class Receiver implements DeliveryStateHandler { @@ -137,36 +139,47 @@ public class Receiver implements DeliveryStateHandler _endpoint.setLocalUnsettled(unsettled); _endpoint.attach(); - synchronized(_endpoint.getLock()) + try { - while(!_endpoint.isAttached() && !_endpoint.isDetached()) + _endpoint.waitUntil(new Predicate() { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) + + @Override + public boolean isSatisfied() { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return _endpoint.isAttached() || _endpoint.isDetached(); } - } + }); + } + catch (TimeoutException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for attach"); + } + catch (InterruptedException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for attach"); } if(_endpoint.getSource() == null) { - synchronized(_endpoint.getLock()) + try { - while(!_endpoint.isDetached()) + _endpoint.waitUntil(new Predicate() { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) + @Override + public boolean isSatisfied() { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return _endpoint.isDetached(); } - } + }); + } + catch (TimeoutException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for detach following failed attach"); + } + catch (InterruptedException e) + { + throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted whil waiting for detach following failed attach"); } throw new ConnectionErrorException(getError()); } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index e891c5cbe7..afbabcf363 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; +import org.apache.qpid.amqp_1_0.transport.Predicate; import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; import org.apache.qpid.amqp_1_0.type.*; @@ -39,10 +40,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; + import org.apache.qpid.amqp_1_0.type.transport.Error; public class Sender implements DeliveryStateHandler { + private static final long UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER = 1000l; + private static final long DEFAULT_CREDIT_TIMEOUT = 30000l; + private SendingLinkEndpoint _endpoint; private int _id; private Session _session; @@ -150,17 +156,26 @@ public class Sender implements DeliveryStateHandler synchronized(_endpoint.getLock()) { - while(!(_endpoint.isAttached() || _endpoint.isDetached())) + try { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) - { - throw new SenderCreationException(e); - } + _endpoint.waitUntil(new Predicate() + { + @Override + public boolean isSatisfied() + { + return _endpoint.isAttached() || _endpoint.isDetached(); + } + }); + } + catch (TimeoutException e) + { + throw new SenderCreationException(e); } + catch (InterruptedException e) + { + throw new SenderCreationException(e); + } + if (session.getEndpoint().isEnded()) { throw new SenderCreationException("Session is closed while creating link, target: " + target.getAddress()); @@ -225,22 +240,22 @@ public class Sender implements DeliveryStateHandler return _endpoint.getTarget(); } - public void send(Message message) throws LinkDetachedException + public void send(Message message) throws LinkDetachedException, TimeoutException { send(message, null, null); } - public void send(Message message, final OutcomeAction action) throws LinkDetachedException + public void send(Message message, final OutcomeAction action) throws LinkDetachedException, TimeoutException { send(message, null, action); } - public void send(Message message, final Transaction txn) throws LinkDetachedException + public void send(Message message, final Transaction txn) throws LinkDetachedException, TimeoutException { send(message, txn, null); } - public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException + public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException, TimeoutException { List<Section> sections = message.getPayload(); @@ -290,19 +305,26 @@ public class Sender implements DeliveryStateHandler xfr.setSettled(message.getSettled() || _endpoint.getSendingSettlementMode() == SenderSettleMode.SETTLED); } final Object lock = _endpoint.getLock(); + synchronized(lock) { - while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached()) + + try { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } + _endpoint.waitUntil(new Predicate() + { + @Override + public boolean isSatisfied() + { + return _endpoint.hasCreditToSend() || _endpoint.isDetached(); + } + }, getCreditTimeout()); } + catch (InterruptedException e) + { + throw new TimeoutException("Interrupted while waiting for credit"); + } + if(_endpoint.isDetached()) { throw new LinkDetachedException(_error); @@ -312,27 +334,24 @@ public class Sender implements DeliveryStateHandler _outcomeActions.put(message.getDeliveryTag(), action); } _endpoint.transfer(xfr); - //TODO - rationalise sending of flows - // _endpoint.sendFlow(); } if(_windowSize != 0) { - synchronized(lock) + try { - - - while(_endpoint.getUnsettledCount() >= _windowSize) - { - try - { - lock.wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } + _endpoint.waitUntil(new Predicate() + { + @Override + public boolean isSatisfied() + { + return _endpoint.getUnsettledCount() < _windowSize; + } + }, getUnsettledTimeout()); + } + catch (InterruptedException e) + { + throw new TimeoutException("Interrupted while waiting for the window to expand to allow sending"); } } @@ -340,26 +359,37 @@ public class Sender implements DeliveryStateHandler } + private long getCreditTimeout() + { + return _endpoint.getSyncTimeout() < DEFAULT_CREDIT_TIMEOUT ? DEFAULT_CREDIT_TIMEOUT : _endpoint.getSyncTimeout(); + } + public void close() throws SenderClosingException { + boolean unsettledDeliveries = false; if(_windowSize != 0) { - synchronized(_endpoint.getLock()) - { - + long timeout = getUnsettledTimeout(); - while(_endpoint.getUnsettledCount() > 0) + try + { + _endpoint.waitUntil(new Predicate() { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) + @Override + public boolean isSatisfied() { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + return _endpoint.getUnsettledCount() == 0; } - } + }, timeout); + } + catch (InterruptedException e) + { + unsettledDeliveries = true; + } + catch (TimeoutException e) + { + unsettledDeliveries = true; } } @@ -368,20 +398,41 @@ public class Sender implements DeliveryStateHandler _endpoint.detach(); _closed = true; - synchronized(_endpoint.getLock()) + try { - while(!_endpoint.isDetached()) + _endpoint.waitUntil(new Predicate() { - try - { - _endpoint.getLock().wait(); - } - catch (InterruptedException e) + @Override + public boolean isSatisfied() { - throw new SenderClosingException(e); + return _endpoint.isDetached(); } - } + }); + } + catch (TimeoutException e) + { + throw new SenderClosingException("Timed out attempting to detach link", e); } + catch (InterruptedException e) + { + throw new SenderClosingException("Interrupted while attempting to detach link", e); + } + if(unsettledDeliveries && _endpoint.getUnsettledCount() > 0) + { + throw new SenderClosingException("Some messages may not have been received by the recipient"); + } + } + + private long getUnsettledTimeout() + { + long timeout = _endpoint.getSyncTimeout(); + + // give a generous timeout where there are unsettled messages + if(timeout < _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER) + { + timeout = _endpoint.getUnsettledCount() * UNSETTLED_MESSAGE_TIMEOUT_MULTIPLIER; + } + return timeout; } public boolean isClosed() @@ -468,10 +519,20 @@ public class Sender implements DeliveryStateHandler public class SenderClosingException extends Exception { + public SenderClosingException(final String message, final Throwable cause) + { + super(message, cause); + } + public SenderClosingException(Throwable e) { super(e); } + + public SenderClosingException(final String message) + { + super(message); + } } public static interface OutcomeAction diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index a3c4ad7b5a..1c80668856 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -40,10 +40,8 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import javax.security.sasl.SaslServerFactory; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -51,7 +49,7 @@ import java.nio.charset.Charset; import java.security.Principal; import java.util.ArrayList; import java.util.Arrays; -import java.util.Enumeration; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,6 +69,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue(); private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15); + private static final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqp.connection_sync_timeout",5000l); private ConnectionState _state = ConnectionState.UNOPENED; @@ -122,6 +121,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private Error _remoteError; private Map _properties; + private long _syncTimeout = DEFAULT_SYNC_TIMEOUT; public ConnectionEndpoint(Container container, SaslServerProvider cbs) { @@ -1054,4 +1054,42 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour { _channelMax = channelMax; } + + public long getSyncTimeout() + { + return _syncTimeout; + } + + public void setSyncTimeout(final long syncTimeout) + { + _syncTimeout = syncTimeout; + } + + public void waitUntil(Predicate predicate) throws InterruptedException, TimeoutException + { + waitUntil(predicate, _syncTimeout); + } + + public void waitUntil(Predicate predicate, long timeout) throws InterruptedException, TimeoutException + { + long endTime = System.currentTimeMillis() + timeout; + + synchronized(getLock()) + { + while(!predicate.isSatisfied()) + { + getLock().wait(timeout); + + if(!predicate.isSatisfied()) + { + timeout = endTime - System.currentTimeMillis(); + if(timeout <= 0l) + { + throw new TimeoutException(); + } + } + } + } + + } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java index 32fffd545a..301dd0695a 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java @@ -28,6 +28,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeoutException; public abstract class LinkEndpoint<T extends LinkEventListener> { @@ -324,6 +325,23 @@ public abstract class LinkEndpoint<T extends LinkEventListener> return _session.getLock(); } + + public long getSyncTimeout() + { + return _session.getSyncTimeout(); + } + + public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException + { + _session.waitUntil(predicate); + } + + public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException + { + _session.waitUntil(predicate, timeout); + } + + public void attach() { synchronized(getLock()) diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java new file mode 100644 index 0000000000..3acd576527 --- /dev/null +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.transport; + +public interface Predicate +{ + boolean isSatisfied(); +} diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java index f7a3cd3800..34ca851978 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java @@ -35,6 +35,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.TimeoutException; public class SessionEndpoint { @@ -618,6 +619,23 @@ catch(IllegalArgumentException e) return _connection.getLock(); } + + public long getSyncTimeout() + { + return _connection.getSyncTimeout(); + } + + public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException + { + _connection.waitUntil(predicate); + } + + public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException + { + _connection.waitUntil(predicate, timeout); + } + + public ReceivingLinkEndpoint createReceivingLinkEndpoint(final String name, String targetAddr, String sourceAddr, diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java index 6e1af84cc9..11319f738b 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java @@ -31,8 +31,7 @@ import java.util.Map; import org.apache.qpid.amqp_1_0.type.*; public class Error - { - +{ private ErrorCondition _condition; @@ -40,6 +39,16 @@ public class Error private Map _info; + public Error() + { + } + + public Error(final ErrorCondition condition, final String description) + { + _condition = condition; + _description = description; + } + public ErrorCondition getCondition() { return _condition; |
