summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-01-17 16:54:19 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-01-17 16:54:19 +0000
commitc4d02ad9f945f41b8655ce18463155696f23e2e3 (patch)
tree13e07b5d4ca26db74437ea0fa8c8e3bbfc383fb4 /java/client/src
parent2fec3c4f9ce8ba227f4c53d251aa6d11c7772a09 (diff)
downloadqpid-python-c4d02ad9f945f41b8655ce18463155696f23e2e3.tar.gz
added error handling and resolved compilation errors
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497078 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java65
1 files changed, 45 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 73dfdd90e5..2dfd864507 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -223,11 +223,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (message.content != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.contentHeader.getDestination());
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.contentHeader.getDestination() + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
@@ -245,8 +245,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
false,
- message.contentHeader,
- message.bodies);
+ e message.contentHeader,
+ message.content);
int errorCode = message.bounceBody.replyCode;
String reason = message.bounceBody.replyText;
@@ -316,12 +316,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
new FlowControllingBlockingQueue.ThresholdListener()
{
- public void aboveThreshold(int currentValue)
+ public void aboveThreshold(int currentValue)
{
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- suspendChannel();
+ try{
+ suspendChannel();
+ }catch (AMQException e) {
+ _logger.error("FlowControllingBlockingQueue,aboveThreshold, Cannot Suspend the channel",e);
+ }
}
}
@@ -330,7 +334,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
+ try {
+ unsuspendChannel();
+ } catch (AMQException e) {
+ _logger.error("FlowControllingBlockingQueue,underThreshold, Cannot Unsuspend the channel",e);
+ }
}
}
});
@@ -767,9 +775,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().writeRequest(_channelId,
- MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
- false)); // requeue
+ try {
+ _connection.getProtocolHandler().writeRequest(_channelId,
+ MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor)
+ false)); // requeue
+ } catch (AMQException e) {
+ _logger.error("Error recovering",e);
+ JMSException ex = new JMSException("Error Recovering");
+ ex.initCause(e);
+ throw ex;
+ }
}
boolean isInRecovery()
@@ -1094,7 +1109,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
- public void declareExchange(String name, String type)
+ public void declareExchange(String name, String type) throws AMQException
{
declareExchange(name, type, _connection.getProtocolHandler());
}
@@ -1118,12 +1133,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)throws AMQException
{
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
}
- private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
+ private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) throws AMQException
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1139,6 +1154,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // passive
0, // ticket
type); // type
+
protocolHandler.writeRequest(_channelId, methodBody);
}
@@ -1586,8 +1602,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* @param deliveryTag the tag of the last message to be acknowledged
* @param multiple if true will acknowledge all messages up to and including the one specified by the
* delivery tag
+ * @throws AMQException
*/
- public void acknowledgeMessage(long requestId, boolean multiple)
+ public void acknowledgeMessage(long requestId, boolean multiple) throws AMQException
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
@@ -1626,8 +1643,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
if (_dispatcher != null)
{
- //then we stopped this and are restarting, so signal server to resume delivery
- unsuspendChannel();
+ try{
+ //then we stopped this and are restarting, so signal server to resume delivery
+ unsuspendChannel();
+ }catch(AMQException e){
+ _logger.error("Error Un Suspending Channel", e);
+ }
}
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
@@ -1637,8 +1658,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void stop()
{
//stop the server delivering messages to this session
- suspendChannel();
-
+ try{
+ suspendChannel();
+ }catch(AMQException e){
+ _logger.error("Error Suspending Channel", e);
+ }
+
//stop the dispatcher thread
_stopped.set(true);
}
@@ -1750,7 +1775,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private void suspendChannel()
+ private void suspendChannel() throws AMQException
{
_logger.warn("Suspending channel");
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
@@ -1762,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeRequest(_channelId, methodBody);
}
- private void unsuspendChannel()
+ private void unsuspendChannel() throws AMQException
{
_logger.warn("Unsuspending channel");
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)