summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java123
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
3 files changed, 95 insertions, 61 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d24ad46512..430a4bd9e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -361,8 +361,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -382,9 +381,21 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
flushTask = null;
}
flushAcknowledgments();
- getQpidSession().sync();
- getQpidSession().close();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ getQpidSession().close();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@@ -403,7 +414,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().setAutoSync(false);
}
// We need to sync so that we get notify of an error.
- getCurrentException();
+ sync();
}
/**
@@ -426,8 +437,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
autoDelete ? Option.AUTO_DELETE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -451,8 +461,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -566,7 +575,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
try
{
boolean isTopic;
-
+
if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
{
isTopic = consumer.getDestination() instanceof AMQTopic ||
@@ -583,7 +592,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(isTopic || consumer.getMessageSelector() == null ||
consumer.getMessageSelector().equals(""));
}
-
+
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
@@ -607,7 +616,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
Option.UNRELIABLE);
-
+
if(capacity > 0 && _dispatcher != null && (isStarted() || _immediatePrefetch))
{
// set the flow
@@ -619,11 +628,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
-
+
private long getCapacity(AMQDestination destination)
{
long capacity = 0;
@@ -677,8 +685,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// We need to sync so that we get notify of an error.
if (!nowait)
{
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
}
@@ -710,7 +717,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
queueName = amqd.getAMQQueueName();
}
-
+
if (amqd.getDestSyntax() == DestSyntax.BURL)
{
Map<String,Object> arguments = new HashMap<String,Object>();
@@ -718,7 +725,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
arguments.put("no-local", true);
}
-
+
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
@@ -733,13 +740,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
node.isDurable() ? Option.DURABLE : Option.NONE,
node.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
-
+
// passive --> false
if (!nowait)
{
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
return queueName;
}
@@ -753,8 +759,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
// ifEmpty --> false
// ifUnused --> false
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
/**
@@ -807,8 +812,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
@@ -816,8 +820,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().txRollback();
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ sync();
}
//------ Private methods
@@ -835,19 +838,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Get the latest thrown exception.
*
- * @throws org.apache.qpid.AMQException get the latest thrown error.
+ * @throws SessionException get the latest thrown error.
*/
- public void getCurrentException() throws AMQException
+ public AMQException getCurrentException()
{
+ AMQException amqe = null;
synchronized (_currentExceptionLock)
{
if (_currentException != null)
{
- AMQException amqe = _currentException;
+ amqe = _currentException;
_currentException = null;
- throw amqe;
}
}
+ return amqe;
}
public void opened(Session ssn) {}
@@ -872,22 +876,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void exception(Session ssn, SessionException exc)
{
- synchronized (_currentExceptionLock)
- {
- ExecutionException ee = exc.getException();
- int code;
- if (ee == null)
- {
- code = AMQConstant.INTERNAL_ERROR.getCode();
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- AMQException amqe = new AMQException(AMQConstant.getConstant(code), exc.getMessage(), exc.getCause());
- _connection.exceptionReceived(amqe);
- _currentException = amqe;
- }
+ setCurrentException(exc);
}
public void closed(Session ssn) {}
@@ -1041,11 +1030,41 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
return Serial.lt((int) currentMark, (int) deliveryTag);
}
-
+
public void sync() throws AMQException
{
- _qpidSession.sync();
- getCurrentException();
+ try
+ {
+ getQpidSession().sync();
+ }
+ catch (SessionException se)
+ {
+ setCurrentException(se);
+ }
+
+ AMQException amqe = getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ public void setCurrentException(SessionException se)
+ {
+ synchronized (_currentExceptionLock)
+ {
+ ExecutionException ee = se.getException();
+ int code = AMQConstant.INTERNAL_ERROR.getCode();
+ if (ee != null)
+ {
+ code = ee.getErrorCode().getValue();
+ }
+ AMQException amqe = new AMQException(AMQConstant.getConstant(code), se.getMessage(), se.getCause());
+
+ _connection.exceptionReceived(amqe);
+
+ _currentException = amqe;
+ }
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index a942d808a9..eddaa1a6bb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -168,16 +168,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
@Override void sendCancel() throws AMQException
{
- ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
- getSession().confirmConsumerCancelled(getConsumerTag());
- ((AMQSession_0_10) getSession()).getCurrentException();
+ _0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ try
+ {
+ _0_10session.getQpidSession().sync();
+ getSession().confirmConsumerCancelled(getConsumerTag()); // confirm cancel
+ }
+ catch (SessionException se)
+ {
+ _0_10session.setCurrentException(se);
+ }
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
@Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
{
-
super.notifyMessage(messageFrame);
}
@@ -285,7 +295,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_0_10session.messageAcknowledge
(ranges,
_acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- _0_10session.getCurrentException();
+
+ AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
}
}
@@ -302,7 +317,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.getCurrentException();
+ _0_10session.sync();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index df59be25d0..14e1601993 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -266,7 +266,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
return _destination;
}
- public void close() throws JMSException
+ public void close()
{
_closed.set(true);
_session.deregisterProducer(_producerId);