summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 13:07:11 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 13:07:11 +0000
commit0b22baa11318fc7e86c9d1b9b74ad3d83e276859 (patch)
treefeec0dde57566d51d7d1cea42488d739230ebb7b /qpid/java
parent14d3de969b10a86fa8c1531ea22db0d9143d1b3e (diff)
downloadqpid-python-0b22baa11318fc7e86c9d1b9b74ad3d83e276859.tar.gz
QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1481321 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java29
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java17
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java86
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java66
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java37
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java9
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java19
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java15
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java12
-rw-r--r--qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java11
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java40
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java44
12 files changed, 312 insertions, 73 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
new file mode 100644
index 0000000000..bc2b6349c8
--- /dev/null
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public class MessageRejectedException extends JMSException
+{
+ public MessageRejectedException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index cb42c49bbc..b6e11ab44e 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -220,12 +220,14 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -242,8 +244,21 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index 77544e4112..3b44bae84b 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -19,17 +19,21 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
{
@@ -43,6 +47,8 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout = Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -251,7 +257,28 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -377,4 +404,61 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
{
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ccd5f01909..0b175f3b27 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Session, QueueSession, TopicSession
jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener = _connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new JMSException(error.getDescription(),
+ error.getCondition().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -846,7 +889,28 @@ public class SessionImpl implements Session, QueueSession, TopicSession
}
}
+ Iterator<MessageConsumerImpl> consumers = _consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+ _connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
index c3193f9fea..5e77b7097c 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
@@ -173,7 +173,14 @@ public class Demo extends Util
Section[] sections = { properties, appProperties, amqpValue};
final Message message1 = new Message(Arrays.asList(sections));
- s.send(message1);
+ try
+ {
+ s.send(message1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
Map<Object, Receiver> receivingLinks = new HashMap<Object, Receiver>();
@@ -295,7 +302,14 @@ public class Demo extends Util
m2propmap.put(VENDOR, vendor);
ApplicationProperties m2appProps = new ApplicationProperties(m2propmap);
Message m2 = new Message(Arrays.asList(m2props, m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
+ try
+ {
+ sender.send(m2);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
@@ -307,8 +321,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
responseReceiver.acknowledge(m);
@@ -336,7 +356,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new ApplicationProperties(m3propmap),
new AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
entry.getValue().acknowledge(m);
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
index 998d68cfa6..06440b8f19 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
@@ -121,14 +121,7 @@ public class Dump extends Util
session.close();
conn.close();
- } catch (ConnectionException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
+ } catch (Exception e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
index 37550ea52f..c7bcd99312 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
@@ -248,27 +248,10 @@ public class Filesender extends Util
session.close();
conn.close();
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace();
}
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (NoSuchAlgorithmException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
}
private Message createMessageFromFile(MessageDigest md5, String fileName, File file) throws IOException
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
index d8da58dc76..dbe273182f 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
@@ -216,23 +216,10 @@ public class Request extends Util
conn2.close();
}
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
protected boolean hasSingleLinkPerConnectionMode()
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
index 6b1b70476e..1e4bcfc7d7 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
@@ -274,21 +274,13 @@ public class Respond extends Util
_conn.close();
System.out.println("Received: " + receivedCount);
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
}
- private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException
+ private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException
{
List<Section> sections = m.getPayload();
String replyTo = null;
diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
index ef1a31005c..b4ae16ab3f 100644
--- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
+++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
@@ -219,19 +219,10 @@ public class Send extends Util
session.close();
conn.close();
}
- catch (Sender.SenderClosingException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
new file mode 100644
index 0000000000..45b00255f2
--- /dev/null
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class LinkDetachedException extends Exception
+{
+ private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError;
+
+ public LinkDetachedException(Error remoteError)
+ {
+ super();
+ _remoteError = remoteError;
+ }
+
+ public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
+ {
+ return _remoteError;
+ }
+
+}
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
index cf9f44af75..0600c18474 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
@@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.Source;
import org.apache.qpid.amqp_1_0.type.Target;
@@ -35,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
@@ -44,6 +47,7 @@ public class Sender implements DeliveryStateHandler
private int _windowSize;
private Map<Binary, OutcomeAction> _outcomeActions = Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
+ private Error _error;
public Sender(final Session session, final String linkName, final String targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -166,6 +170,17 @@ public class Sender implements DeliveryStateHandler
throw new SenderCreationException("Peer did not create remote endpoint for link, target: " + target.getAddress());
};
}
+
+ _endpoint.setLinkEventListener(new SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ super.remoteDetached(endpoint, detach);
+ }
+ });
}
public Source getSource()
@@ -178,22 +193,22 @@ public class Sender implements DeliveryStateHandler
return _endpoint.getTarget();
}
- public void send(Message message)
+ public void send(Message message) throws LinkDetachedException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action)
+ public void send(Message message, final OutcomeAction action) throws LinkDetachedException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn)
+ public void send(Message message, final Transaction txn) throws LinkDetachedException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction action)
+ public void send(Message message, final Transaction txn, OutcomeAction action) throws LinkDetachedException
{
List<Section> sections = message.getPayload();
@@ -245,7 +260,7 @@ public class Sender implements DeliveryStateHandler
final Object lock = _endpoint.getLock();
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend())
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
{
try
{
@@ -256,6 +271,10 @@ public class Sender implements DeliveryStateHandler
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
if(action != null)
{
_outcomeActions.put(message.getDeliveryTag(), action);
@@ -352,6 +371,21 @@ public class Sender implements DeliveryStateHandler
_endpoint.updateDisposition(deliveryTag, state, true);
}
}
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, ((TransactionalState) state).getOutcome());
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
}
public Map<Binary, DeliveryState> getRemoteUnsettled()