diff options
| author | Alex Rudyy <orudyy@apache.org> | 2013-06-26 14:53:34 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2013-06-26 14:53:34 +0000 |
| commit | 8bdc5010fef590e5dc3c424b1f41f8df42c47c11 (patch) | |
| tree | 4805fd38f0f9a321581cfb0079e1ef2dd571e84e /qpid/java | |
| parent | 92067361c279257be2b2ee73484da61c9327c05a (diff) | |
| download | qpid-python-8bdc5010fef590e5dc3c424b1f41f8df42c47c11.tar.gz | |
QPID-4951: Add cause code and message into operational logs for session close initiated by the Broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
14 files changed, 186 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8588aea2d4..4df40585d9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -60,6 +60,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPChannelActor; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -574,13 +575,21 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F @Override public void close() throws AMQException { + close(null, null); + } + + public void close(AMQConstant cause, String message) throws AMQException + { if(!_closing.compareAndSet(false, true)) { //Channel is already closing return; } - CurrentActor.get().message(_logSubject, ChannelMessages.CLOSE()); + LogMessage operationalLogMessage = cause == null ? + ChannelMessages.CLOSE() : + ChannelMessages.CLOSE_FORCED(cause.getCode(), message); + CurrentActor.get().message(_logSubject, operationalLogMessage); unsubscribeAllConsumers(); _transaction.rollback(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties index b5df212904..397c12d73c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties @@ -22,6 +22,8 @@ CREATE = CHN-1001 : Create # 0 - flow FLOW = CHN-1002 : Flow {0} CLOSE = CHN-1003 : Close +CLOSE_FORCED = CHN-1003 : Close : {0,number} - {1} + # 0 - bytes allowed in prefetch # 1 - number of messagse. PREFETCH_SIZE = CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index e9b0fd9f10..1b24671575 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -450,12 +450,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (AMQException e) { - closeChannel(channelId); + closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); throw e; } catch (TransportException e) { - closeChannel(channelId); + closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage()); throw e; } @@ -601,7 +601,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } writeFrame(e.getCloseFrame(channelId)); - closeChannel(channelId); + closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage()); } else { @@ -824,6 +824,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi @Override public void closeChannel(int channelId) throws AMQException { + closeChannel(channelId, null, null); + } + + public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException + { final AMQChannel channel = getChannel(channelId); if (channel == null) { @@ -833,7 +838,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { try { - channel.close(); + channel.close(cause, message); markChannelAwaitingCloseOk(channelId); } finally @@ -1490,7 +1495,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException { int channelId = ((AMQChannel)session).getChannelId(); - closeChannel(channelId); + closeChannel(channelId, cause, message); MethodRegistry methodRegistry = getMethodRegistry(); ChannelCloseBody responseBody = diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 1842117d6f..6fa497c853 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodDispatcher; import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.logging.LogActor; @@ -119,6 +120,8 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth */ void closeChannel(int channelId) throws AMQException; + void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException; + /** * Markes the specific channel as closed. This will release the lock for that channel id so a new channel can be * created on that id. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index 9d9bbe807b..a3833eebb9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -44,6 +45,8 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> public void close() throws AMQException; + public void close(AMQConstant cause, String message) throws AMQException; + public LogSubject getLogSubject(); /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 60bb8e4044..2b39fb1ff0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -36,6 +36,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.InboundMessage; @@ -477,6 +478,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu // TODO - required for AMQSessionModel / management initiated closing } + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + // TODO - required for AMQSessionModel + } + @Override public LogSubject getLogSubject() { @@ -603,4 +611,5 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu { return _connection; } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index cc28aba981..3a2cb7556d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -208,7 +208,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, ex.setDescription(message); ((ServerSession)session).invoke(ex); - ((ServerSession)session).close(); + ((ServerSession)session).close(cause, message); } public LogSubject getLogSubject() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 6152ddd228..3449cc5237 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -38,6 +38,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; @@ -45,6 +47,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; @@ -140,6 +143,7 @@ public class ServerSession extends Session private final TransactionTimeoutHelper _transactionTimeoutHelper; + private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>(); public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { @@ -380,7 +384,12 @@ public class ServerSession extends Session task.doTask(this); } - CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); + LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); + if (operationalLoggingMessage == null) + { + operationalLoggingMessage = ChannelMessages.CLOSE(); + } + CurrentActor.get().message(getLogSubject(), operationalLoggingMessage); } @Override @@ -787,6 +796,25 @@ public class ServerSession extends Session } @Override + public void close(AMQConstant cause, String message) + { + if (cause == null) + { + close(); + } + else + { + close(cause.getCode(), message); + } + } + + void close(int cause, String message) + { + _forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message)); + close(); + } + + @Override public void close() { // unregister subscriptions in order to prevent sending of new messages diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 63419fce3f..0a4bb79ed7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -835,7 +835,7 @@ public class ServerSessionDelegate extends SessionDelegate session.invoke(ex); - session.close(); + ((ServerSession)session).close(errorCode.getValue(), description); } private Exchange getExchange(Session session, String exchangeName) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java index e94b79ba95..2f1276508c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java @@ -61,4 +61,13 @@ public class ChannelMessagesTest extends AbstractTestMessages validateLogMessage(log, "CHN-1003", expected); } + public void testChannelCloseForced() + { + _logMessage = ChannelMessages.CLOSE_FORCED(1, "Test"); + List<Object> log = performLog(); + + String[] expected = {"Close : 1 - Test"}; + + validateLogMessage(log, "CHN-1003", expected); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 7f797afeda..ed60d5374b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -405,6 +405,11 @@ public class MockSubscription implements Subscription { return getId().compareTo(o.getId()); } + + @Override + public void close(AMQConstant cause, String message) throws AMQException + { + } } private static class MockConnectionModel implements AMQConnectionModel diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java index 1b2ec9c092..cec339c033 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java @@ -20,16 +20,22 @@ */ package org.apache.qpid.server.logging; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import java.util.List; +import java.util.regex.Pattern; public class ChannelLoggingTest extends AbstractTestLogging { + private static final String CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN = "CHN-1003 : Close : \\d* - .*"; private static final String CHANNEL_PREFIX = "CHN-"; // No explicit startup configuration is required for this test @@ -298,6 +304,102 @@ public class ChannelLoggingTest extends AbstractTestLogging validateChannelClose(results); } + public void testChannelClosedOnQueueArgumentsMismatch() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session and then close it + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + waitForMessage("CHN-1001"); + + String testQueueName = getTestQueueName(); + + Queue nonDurableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName + + "?durable='false'"); + + ((AMQSession<?,?>)session).declareAndBind((AMQDestination)nonDurableQueue); + + Queue durableQueue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName + + "?durable='true'"); + try + { + ((AMQSession<?,?>)session).declareAndBind((AMQDestination) durableQueue); + fail("Exception not thrown"); + } + catch (AMQChannelClosedException acce) + { + // pass + } + catch (Exception e) + { + fail("Wrong exception thrown " + e); + } + waitForMessage("CHN-1003"); + + List<String> results = findMatches(CHANNEL_PREFIX); + assertTrue("No CHN messages logged", results.size() > 0); + + String closeLog = results.get(results.size() -1); + int closeMessageID = closeLog.indexOf("CHN-1003"); + assertFalse("CHN-1003 is not found", closeMessageID == -1); + + String closeMessage = closeLog.substring(closeMessageID); + assertTrue("Unexpected close channel message :" + closeMessage, Pattern.matches(CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN, closeMessage)); + + session.close(); + connection.close(); + } + + public void testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession() throws Exception + { + assertLoggingNotYetOccured(CHANNEL_PREFIX); + + Connection connection = getConnection(); + + // Create a session and then close it + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + waitForMessage("CHN-1001"); + + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + waitForMessage("CHN-1001"); + + String testQueueName = getTestQueueName(); + + Queue queue = (Queue) session.createQueue("direct://amq.direct/" + testQueueName + "/" + testQueueName + + "?exclusive='true'"); + + ((AMQSession<?,?>)session).declareAndBind((AMQDestination)queue); + + try + { + ((AMQSession<?,?>)session2).declareAndBind((AMQDestination) queue); + fail("Exception not thrown"); + } + catch (AMQException acce) + { + // pass + } + catch (Exception e) + { + fail("Wrong exception thrown " + e); + } + waitForMessage("CHN-1003"); + + List<String> results = findMatches(CHANNEL_PREFIX); + assertTrue("No CHN messages logged", results.size() > 0); + + String closeLog = results.get(results.size() -1); + int closeMessageID = closeLog.indexOf("CHN-1003"); + assertFalse("CHN-1003 is not found", closeMessageID == -1); + + String closeMessage = closeLog.substring(closeMessageID); + assertTrue("Unexpected close channel message :" + closeMessage, Pattern.matches(CHANNEL_CLOSE_FORCED_MESSAGE_PATTERN, closeMessage)); + + session.close(); + connection.close(); + } private void validateChannelClose(List<String> results) { String open = getLogMessage(results, 0); diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index 96f90701fd..b0489fca38 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -33,6 +33,7 @@ org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsing org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend +org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnQueueArgumentsMismatch // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index ad8682a0bd..7f9a20e245 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -36,6 +36,7 @@ org.apache.qpid.server.queue.AddressBasedSortedQueueTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* org.apache.qpid.client.SynchReceiveTest#testReceiveNoWait +org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnExclusiveQueueDeclaredOnDifferentSession // Makes explicit use of 0-10 connection object org.apache.qpid.client.ssl.SSLTest#testMultipleCertsInSingleStore |
