summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-04-22 19:15:39 +0000
committerRafael H. Schloming <rhs@apache.org>2008-04-22 19:15:39 +0000
commit24002d5c08143b714a9ed5ab0d2fa70b7ce25ddd (patch)
treece2c61627bda80468304257e1e4da6fdcabb958c
parent09742065d5d765f6bebf5fcd3a9aad9ef325b76d (diff)
downloadqpid-python-24002d5c08143b714a9ed5ab0d2fa70b7ce25ddd.tar.gz
QPID-832: moved more 0-8 specific code into 0-8 subclasses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/thegreatmerge@650617 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java11
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java27
3 files changed, 11 insertions, 40 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 2e9aea3dcb..e7c607d95a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -546,18 +546,9 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
// The Synchronized block only needs to protect network traffic.
synchronized (_connection.getFailoverMutex())
{
- BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
-
- final AMQFrame cancelFrame = body.generateFrame(_channelId);
-
try
{
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("CancelOk'd for consumer:" + debugIdentity());
- }
+ sendCancel();
}
catch (AMQException e)
{
@@ -592,7 +583,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
}
}
- public abstract void sendCancel() throws JMSAMQException;
+ abstract void sendCancel() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 6ec2f37f29..b976b23602 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -211,20 +211,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
* This method is invoked when this consumer is stopped.
* It tells the broker to stop delivering messages to this consumer.
*/
- public void sendCancel() throws JMSAMQException
+ void sendCancel() throws AMQException
{
((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
// confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
- try
- {
- ((AMQSession_0_10) getSession()).getCurrentException();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Problem when stopping consumer", e);
- }
+ ((AMQSession_0_10) getSession()).getCurrentException();
}
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 1635c51573..5414e25539 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -51,30 +51,17 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
acknowledgeMode, noConsume, autoClose);
}
- public void sendCancel() throws JMSAMQException
+ void sendCancel() throws AMQException, FailoverException
{
- final AMQFrame cancelFrame = _connection.getProtocolHandler().getMethodRegistry().
- createBasicCancelBody(_consumerTag, // consumerTag
- false). // nowait
- generateFrame(_channelId);
+ BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
- try
- {
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ final AMQFrame cancelFrame = body.generateFrame(_channelId);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("CancelOk'd for consumer:" + debugIdentity());
- }
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
+ if (_logger.isDebugEnabled())
{
- throw new JMSAMQException("Error closing consumer: " + e, e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+ _logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
@@ -86,5 +73,5 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
}
-
+
} \ No newline at end of file