From c4d02ad9f945f41b8655ce18463155696f23e2e3 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 17 Jan 2007 16:54:19 +0000 Subject: added error handling and resolved compilation errors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497078 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 65 +++++++++++++++------- 1 file changed, 45 insertions(+), 20 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 73dfdd90e5..2dfd864507 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -223,11 +223,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (message.content != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.content.consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.contentHeader.getDestination()); if (consumer == null) { - _logger.warn("Received a message from queue " + message.content.consumerTag + " without a handler - ignoring..."); + _logger.warn("Received a message from queue " + message.contentHeader.getDestination() + " without a handler - ignoring..."); _logger.warn("Consumers that exist: " + _consumers); _logger.warn("Session hashcode: " + System.identityHashCode(this)); } @@ -245,8 +245,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, - message.contentHeader, - message.bodies); + e message.contentHeader, + message.content); int errorCode = message.bounceBody.replyCode; String reason = message.bounceBody.replyText; @@ -316,12 +316,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, new FlowControllingBlockingQueue.ThresholdListener() { - public void aboveThreshold(int currentValue) + public void aboveThreshold(int currentValue) { if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); + try{ + suspendChannel(); + }catch (AMQException e) { + _logger.error("FlowControllingBlockingQueue,aboveThreshold, Cannot Suspend the channel",e); + } } } @@ -330,7 +334,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (_acknowledgeMode == NO_ACKNOWLEDGE) { _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); + try { + unsuspendChannel(); + } catch (AMQException e) { + _logger.error("FlowControllingBlockingQueue,underThreshold, Cannot Unsuspend the channel",e); + } } } }); @@ -767,9 +775,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - _connection.getProtocolHandler().writeRequest(_channelId, - MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) - false)); // requeue + try { + _connection.getProtocolHandler().writeRequest(_channelId, + MessageRecoverBody.createMethodBody((byte)0, (byte)9, // AMQP version (major, minor) + false)); // requeue + } catch (AMQException e) { + _logger.error("Error recovering",e); + JMSException ex = new JMSException("Error Recovering"); + ex.initCause(e); + throw ex; + } } boolean isInRecovery() @@ -1094,7 +1109,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } - public void declareExchange(String name, String type) + public void declareExchange(String name, String type) throws AMQException { declareExchange(name, type, _connection.getProtocolHandler()); } @@ -1118,12 +1133,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().syncWrite(_channelId, methodBody, ExchangeDeclareOkBody.class); } - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) + private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)throws AMQException { declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); } - private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) + private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) throws AMQException { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1139,6 +1154,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // passive 0, // ticket type); // type + protocolHandler.writeRequest(_channelId, methodBody); } @@ -1586,8 +1602,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * @param deliveryTag the tag of the last message to be acknowledged * @param multiple if true will acknowledge all messages up to and including the one specified by the * delivery tag + * @throws AMQException */ - public void acknowledgeMessage(long requestId, boolean multiple) + public void acknowledgeMessage(long requestId, boolean multiple) throws AMQException { // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -1626,8 +1643,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { if (_dispatcher != null) { - //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); + try{ + //then we stopped this and are restarting, so signal server to resume delivery + unsuspendChannel(); + }catch(AMQException e){ + _logger.error("Error Un Suspending Channel", e); + } } _dispatcher = new Dispatcher(); _dispatcher.setDaemon(true); @@ -1637,8 +1658,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void stop() { //stop the server delivering messages to this session - suspendChannel(); - + try{ + suspendChannel(); + }catch(AMQException e){ + _logger.error("Error Suspending Channel", e); + } + //stop the dispatcher thread _stopped.set(true); } @@ -1750,7 +1775,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private void suspendChannel() + private void suspendChannel() throws AMQException { _logger.warn("Suspending channel"); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) @@ -1762,7 +1787,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeRequest(_channelId, methodBody); } - private void unsuspendChannel() + private void unsuspendChannel() throws AMQException { _logger.warn("Unsuspending channel"); // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) -- cgit v1.2.1