diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-02 14:09:25 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-02 14:09:25 +0000 |
| commit | e274c8f94e7bb9e533a9ed0393b71b590a591cc3 (patch) | |
| tree | 0172bb238d307bbf340484c1aded326047033ee7 /qpid/java/broker-plugins | |
| parent | 67a9864ed236ed085e263beaea7bae2c52522331 (diff) | |
| download | qpid-python-e274c8f94e7bb9e533a9ed0393b71b590a591cc3.tar.gz | |
Make management close session async
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663314 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
6 files changed, 55 insertions, 76 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index e9aa57f480..0c2fc46a11 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -261,23 +261,32 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S _onOpenTask = task; } - public void closeSession(ServerSession session, AMQConstant cause, String message) + public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try - { - code = ExecutionErrorCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + addAsyncTask(new Action<ServerConnection>() { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - session.invoke(ex); - session.close(cause, message); + @Override + public void performAction(final ServerConnection conn) + { + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + session.invoke(ex); + + session.close(cause, message); + } + }); + } public LogSubject getLogSubject() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 50b957d066..17cf8e7101 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -189,7 +189,7 @@ public class ServerSession extends Session @Override public void doTimeoutAction(String reason) { - getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7c79e00c0b..d7e5857924 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -40,7 +40,6 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; @@ -97,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -106,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.QueueExistsException; @@ -1710,16 +1709,7 @@ public class AMQChannel */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _connection.getReceivedLock(); - receivedLock.lock(); - try - { - _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); - } - finally - { - receivedLock.unlock(); - } + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } public void deadLetter(long deliveryTag) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 08411b8581..cb65424b67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -380,7 +380,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _lastIoTime = arrivalTime; _readBytes += msg.remaining(); - _receivedLock.lock(); try { _decoder.decodeBuffer(msg); @@ -432,10 +431,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.error("Store Exception ignored as virtual host no longer active", e); } } - finally - { - _receivedLock.unlock(); - } return null; } }); @@ -846,14 +841,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, if(_closing.compareAndSet(false,true)) { // force sync of outstanding async work - _receivedLock.lock(); try { receivedComplete(); } finally { - _receivedLock.unlock(); finishClose(connectionDropped); } @@ -918,17 +911,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { synchronized(this) { - final boolean lockHeld = _receivedLock.isHeldByCurrentThread(); final long endTime = System.currentTimeMillis() + AWAIT_CLOSED_TIMEOUT; while(!_closed && endTime > System.currentTimeMillis()) { try { - if(lockHeld) - { - _receivedLock.unlock(); - } wait(1000); } catch (InterruptedException e) @@ -936,13 +924,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Thread.currentThread().interrupt(); break; } - finally - { - if(lockHeld) - { - _receivedLock.lock(); - } - } } if (!_closed) @@ -1381,31 +1362,37 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return String.valueOf(getRemoteAddress()); } - public void closeSession(AMQChannel session, AMQConstant cause, String message) + public void closeSessionAsync(final AMQChannel session, final AMQConstant cause, final String message) { - int channelId = session.getChannelId(); - closeChannel(channelId, cause, message); + addAsyncTask(new Action<AMQProtocolEngine>() + { - MethodRegistry methodRegistry = getMethodRegistry(); - ChannelCloseBody responseBody = - methodRegistry.createChannelCloseBody( - cause.getCode(), - AMQShortString.validValueOf(message), - 0, 0); + @Override + public void performAction(final AMQProtocolEngine object) + { + int channelId = session.getChannelId(); + closeChannel(channelId, cause, message); + + MethodRegistry methodRegistry = getMethodRegistry(); + ChannelCloseBody responseBody = + methodRegistry.createChannelCloseBody( + cause.getCode(), + AMQShortString.validValueOf(message), + 0, 0); + + writeFrame(responseBody.generateFrame(channelId)); + } + }); - writeFrame(responseBody.generateFrame(channelId)); } public void closeAsync(final AMQConstant cause, final String message) { - _logger.debug("KWDEBUG About to schedule close"); - Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() { @Override public void performAction(final AMQProtocolEngine object) { - _logger.debug("KWDEBUG About to perform close"); closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), null)); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index f8098eb2ec..3a759cd772 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,7 +41,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; @@ -225,17 +224,6 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } - public void closeSession(AMQChannel session, AMQConstant cause, String message) - { - super.closeSession(session, cause, message); - - //Simulate the Client responding with a CloseOK - // should really update the StateManger but we don't have access here - // changeState(AMQState.CONNECTION_CLOSED); - ((AMQChannel)session).getConnection().closeSession(false); - - } - private class InternalWriteDeliverMethod implements ClientDeliveryMethod { private int _channelId; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index cd4f269029..77932fa680 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -37,7 +37,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; -import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -47,7 +46,6 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; @@ -55,13 +53,13 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { @@ -295,9 +293,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) + public void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) { - session.close(cause, message); + addAsyncTask(new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + session.close(cause, message); + } + }); } @Override |
