diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-02 14:09:25 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-02 14:09:25 +0000 |
| commit | e274c8f94e7bb9e533a9ed0393b71b590a591cc3 (patch) | |
| tree | 0172bb238d307bbf340484c1aded326047033ee7 /qpid/java | |
| parent | 67a9864ed236ed085e263beaea7bae2c52522331 (diff) | |
| download | qpid-python-e274c8f94e7bb9e533a9ed0393b71b590a591cc3.tar.gz | |
Make management close session async
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663314 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
13 files changed, 118 insertions, 92 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java index 82ae9f6454..237a5b4069 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java @@ -20,11 +20,12 @@ */ package org.apache.qpid.server.connection; +import java.net.SocketAddress; + +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.security.auth.SocketConnectionPrincipal; -import java.net.SocketAddress; - public class ConnectionPrincipal implements SocketConnectionPrincipal { private final AMQConnectionModel _connection; @@ -51,6 +52,11 @@ public class ConnectionPrincipal implements SocketConnectionPrincipal return _connection; } + public VirtualHost<?,?,?> getVirtualHost() + { + return _connection.getVirtualHost(); + } + @Override public boolean equals(final Object o) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/log4j/VirtualHostAppender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/log4j/VirtualHostAppender.java new file mode 100644 index 0000000000..25887ef701 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/log4j/VirtualHostAppender.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.logging.log4j; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; + +public class VirtualHostAppender extends AppenderSkeleton +{ + @Override + protected void append(final LoggingEvent event) + { + getLayout().format(event); + final Object virtualhost = event.getMDC("virtualhost"); + } + + @Override + public void close() + { + + } + + @Override + public boolean requiresLayout() + { + return false; + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 64cfc39e1a..eae438754b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -184,7 +184,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @Override public void performAction(final Object object) { - LOGGER.debug("KWDEBUG underlying connection deleted"); closeFuture.connectionClosed(); } }); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 50cfba5bec..95b9bf8970 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -53,7 +53,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends * @param cause * @param message */ - public void closeSession(S session, AMQConstant cause, String message); + public void closeSessionAsync(S session, AMQConstant cause, String message); public long getConnectionId(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1c7cf9d566..eafc969496 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -543,8 +543,8 @@ public class MockConsumer implements ConsumerTarget } @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, - String message) + public void closeSessionAsync(AMQSessionModel session, AMQConstant cause, + String message) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index e9aa57f480..0c2fc46a11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -261,23 +261,32 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S _onOpenTask = task; } - public void closeSession(ServerSession session, AMQConstant cause, String message) + public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try - { - code = ExecutionErrorCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + addAsyncTask(new Action<ServerConnection>() { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - session.invoke(ex); - session.close(cause, message); + @Override + public void performAction(final ServerConnection conn) + { + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + session.invoke(ex); + + session.close(cause, message); + } + }); + } public LogSubject getLogSubject() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 50b957d066..17cf8e7101 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -189,7 +189,7 @@ public class ServerSession extends Session @Override public void doTimeoutAction(String reason) { - getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7c79e00c0b..d7e5857924 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -40,7 +40,6 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; @@ -97,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -106,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.QueueExistsException; @@ -1710,16 +1709,7 @@ public class AMQChannel */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _connection.getReceivedLock(); - receivedLock.lock(); - try - { - _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); - } - finally - { - receivedLock.unlock(); - } + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } public void deadLetter(long deliveryTag) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 08411b8581..cb65424b67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -380,7 +380,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _lastIoTime = arrivalTime; _readBytes += msg.remaining(); - _receivedLock.lock(); try { _decoder.decodeBuffer(msg); @@ -432,10 +431,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.error("Store Exception ignored as virtual host no longer active", e); } } - finally - { - _receivedLock.unlock(); - } return null; } }); @@ -846,14 +841,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - _receivedLock.lock(); try { receivedComplete(); } finally { - _receivedLock.unlock(); finishClose(connectionDropped); } @@ -918,17 +911,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized(this) { - final boolean lockHeld = _receivedLock.isHeldByCurrentThread(); final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT; while(!_closed && endTime > System.currentTimeMillis()) { try { - if(lockHeld) - { - _receivedLock.unlock(); - } wait(1000); } catch (InterruptedException e) @@ -936,13 +924,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Thread.currentThread().interrupt(); break; } - finally - { - if(lockHeld) - { - _receivedLock.lock(); - } - } } if (!_closed) @@ -1381,31 +1362,37 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel session, AMQConstant cause, String message) + public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message) { - int channelId = session.getChannelId(); - closeChannel(channelId, cause, message); + addAsyncTask(new Action<AMQProtocolEngine>() + { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - AMQShortString.validValueOf(message), - 0, 0); + @Override + public void performAction(final AMQProtocolEngine object) + { + int channelId = session.getChannelId(); + closeChannel(channelId, cause, message); + + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + AMQShortString.validValueOf(message), + 0, 0); + + writeFrame(responseBody.generateFrame(channelId)); + } + }); - writeFrame(responseBody.generateFrame(channelId)); } public void closeAsync(final AMQConstant cause, final String message) { - _logger.debug("KWDEBUG About to schedule close"); - Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() { @Override public void performAction(final AMQProtocolEngine object) { - _logger.debug("KWDEBUG About to perform close"); closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), null)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index f8098eb2ec..3a759cd772 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,7 +41,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; @@ -225,17 +224,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } - public void closeSession(AMQChannel session, AMQConstant cause, String message) - { - super.closeSession(session, cause, message); - - //Simulate the Client responding with a CloseOK - // should really update the StateManger but we don't have access here - // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getConnection().closeSession(false); - - } - private class InternalWriteDeliverMethod implements ClientDeliveryMethod { private int _channelId; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cd4f269029..77932fa680 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -37,7 +37,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -47,7 +46,6 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; @@ -55,13 +53,13 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { @@ -295,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) + public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) { - session.close(cause, message); + addAsyncTask(new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + session.close(cause, message); + } + }); } @Override diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java index cdd5e13870..0acdd2509d 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -33,6 +27,12 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase { private Connection con; @@ -122,8 +122,6 @@ public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase { Message msg = consumer.receive(3000); - _logger.debug("KWDEBUG got " + msg); - assertNotNull("Message " + i + " should not be null", msg); assertTrue("Message " + i + " should be a text message", msg instanceof TextMessage); assertEquals("Message content " + i + " does not match expected", Integer.toString(i), ((TextMessage) msg).getText()); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index cf28895799..8a4e22783f 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -147,8 +147,6 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth); - getLogger().debug("KWDEBUG : About to check queue depth using browser"); - // Browse the queue to get a second opinion int msgCount = 0; |
