From db70f1d2908f294fee0ed47cdb478c3ab0f3b252 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 19 Feb 2015 15:24:27 +0000 Subject: Connection close is now performed by i/o thread git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/connection/ConnectionRegistry.java | 2 +- .../server/model/AbstractConfiguredObject.java | 31 ++++++-- .../org/apache/qpid/server/model/CloseFuture.java | 26 +++++++ .../server/model/adapter/ConnectionAdapter.java | 86 ++++++++++++++++++---- .../qpid/server/protocol/AMQConnectionModel.java | 3 +- .../protocol/MultiVersionProtocolEngine.java | 8 +- .../qpid/server/protocol/ServerProtocolEngine.java | 2 +- .../apache/qpid/server/queue/AbstractQueue.java | 4 +- .../server/transport/NonBlockingConnection.java | 2 +- .../server/virtualhost/AbstractVirtualHost.java | 3 +- .../apache/qpid/server/consumer/MockConsumer.java | 2 +- .../actors/BaseConnectionActorTestCase.java | 2 +- .../apache/qpid/server/model/VirtualHostTest.java | 4 +- .../server/protocol/v0_10/ProtocolEngine_0_10.java | 8 +- .../server/protocol/v0_10/ServerConnection.java | 69 +++++++++++++---- .../protocol/v0_10/ServerSessionDelegate.java | 2 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 2 +- .../server/protocol/v0_8/AMQProtocolEngine.java | 53 ++++++++++--- .../qpid/server/protocol/v1_0/Connection_1_0.java | 45 ++++++++++- .../protocol/v1_0/ProtocolEngine_1_0_0_SASL.java | 8 +- .../java/org/apache/qpid/transport/Connection.java | 6 ++ .../qpid/client/AsynchMessageListenerTest.java | 3 + .../management/jmx/ConnectionManagementTest.java | 40 ++++++++-- 23 files changed, 330 insertions(+), 81 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 883785c7ce..a24195075e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry AMQConnectionModel connection = itr.next(); try { - connection.close(AMQConstant.CONNECTION_FORCED, replyText); + connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText); } catch (Exception e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index b9a4b32acb..baf465f6d1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -490,16 +490,37 @@ public abstract class AbstractConfiguredObject> im { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { - beforeClose(); - closeChildren(); - onClose(); - unregister(false); + CloseFuture close = beforeClose(); + + Runnable closeRunnable = new Runnable() + { + @Override + public void run() + { + closeChildren(); + onClose(); + unregister(false); + + } + }; + + if (close == null) + { + closeRunnable.run(); + } + else + { + close.runWhenComplete(closeRunnable); + } + + // if future not complete, schedule the remainder to be done once complete. } } - protected void beforeClose() + protected CloseFuture beforeClose() { + return null; } protected void onClose() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java new file mode 100644 index 0000000000..5e9d794e14 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.model; + + +public interface CloseFuture +{ + public void runWhenComplete(final Runnable closeRunnable); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 62f0e6ae06..64cfc39e1a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -51,6 +52,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject, S extends * @param cause * @param message */ - public void close(AMQConstant cause, String message); + public void closeAsync(AMQConstant cause, String message); public void block(); @@ -110,4 +110,5 @@ public interface AMQConnectionModel, S extends void notifyWork(); boolean isMessageAssignmentSuspended(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 759f5b8eb7..2ccf595c26 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -214,9 +214,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { - _delegate.processPendingMessages(); + _delegate.processPending(); } @Override @@ -260,7 +260,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { } @@ -418,7 +418,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java index 7a0f43d74d..eba1f78ad0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java @@ -42,7 +42,7 @@ public interface ServerProtocolEngine extends ProtocolEngine boolean isMessageAssignmentSuspended(); - void processPendingMessages(); + void processPending(); boolean hasWork(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 65e8a1358d..60999fb2be 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -756,10 +756,10 @@ public abstract class AbstractQueue> } @Override - protected void beforeClose() + protected org.apache.qpid.server.model.CloseFuture beforeClose() { _closing = true; - super.beforeClose(); + return super.beforeClose(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index 12ce46eedb..3a32ddd632 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -287,7 +287,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende _protocolEngine.setMessageAssignmentSuspended(true); - _protocolEngine.processPendingMessages(); + _protocolEngine.processPending(); _protocolEngine.setTransportBlockedForWriting(!doWrite()); boolean dataRead = doRead(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4086a67aae..dce902b126 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -677,9 +677,10 @@ public abstract class AbstractVirtualHost> exte } @Override - protected void beforeClose() + protected CloseFuture beforeClose() { setState(State.UNAVAILABLE); + return null; } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index caba0bd1d8..1c7cf9d566 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -538,7 +538,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 65f9b4b148..e62a16fdec 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -55,7 +55,7 @@ public abstract class BaseConnectionActorTestCase extends BaseActorTestCase } if (_session != null) { - _session.close(AMQConstant.CONNECTION_FORCED, ""); + _session.closeAsync(AMQConstant.CONNECTION_FORCED, ""); } } finally diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index ba6b0d95f3..4485d5cc85 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -251,7 +251,7 @@ public class VirtualHostTest extends QpidTestCase 0, virtualHost.getChildren(Connection.class).size()); - verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); } public void testDeleteVirtualHost_ClosesConnections() @@ -276,7 +276,7 @@ public class VirtualHostTest extends QpidTestCase 0, virtualHost.getChildren(Connection.class).size()); - verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); } public void testCreateDurableQueue() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 0d1fcb008a..10bf0e761e 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index caa8b90485..e9aa57f480 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel> _taskList = + private final CopyOnWriteArrayList> _connectionCloseTaskList = new CopyOnWriteArrayList>(); + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList _sessionListeners = new CopyOnWriteArrayList(); @@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action task : _taskList) + for(Action task : _connectionCloseTaskList) { task.performAction(this); } @@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d33297fbf6..95d54579a7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate } catch (VirtualHostUnavailableException e) { - getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage()); + getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage()); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c0cc7d55b0..8736bbeb3b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1714,7 +1714,7 @@ public class AMQChannel receivedLock.lock(); try { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 88412c3b70..08411b8581 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,8 +36,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List> _taskList = + private final List> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, finally { _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action task : _taskList) + for (Action task : _connectionCloseTaskList) { task.performAction(this); } @@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeSession(false); // currently performs the delete actions. } finally { try { writeFrame(frame); + + // add an async job and not } finally { @@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + _logger.debug("KWDEBUG About to schedule close"); + + Action action = new Action() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + _logger.debug("KWDEBUG About to perform close"); + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } @Override - public void processPendingMessages() + public void processPending() { + while(_asyncTaskList.peek() != null) + { + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + for (AMQSessionModel session : getSessionModels()) { session.processPendingMessages(); @@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _stateChanged.set(true); final Action listener = _workListener.get(); - _logger.info("Work lister is null? " + (listener == null)); if(listener != null) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 097abe9d8b..cd4f269029 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,11 +30,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel { @@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List> _closeTasks = Collections.synchronizedList(new ArrayList>()); + + private final Queue> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; @@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection sessions = new ArrayList(_sessions); @@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action action = new Action() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _protocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 0078235990..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e7b16362e2..7007948980 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -616,6 +616,12 @@ public class Connection extends ConnectionInvoker close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); } + + protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options) + { + connectionClose(replyCode, replyText, _options); + } + public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) { synchronized (lock) diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java index a13bf71d5e..74b1f8a572 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java @@ -70,6 +70,9 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase } + + + public void testMessageListener() throws Exception { CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java index 67af3e17e4..ee9f2070d3 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java @@ -23,13 +23,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; +import javax.jms.*; import javax.management.JMException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -74,6 +68,38 @@ public class ConnectionManagementTest extends QpidBrokerTestCase } } + public void testManagementClosesConnection() throws Exception + { + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + _connection = getConnection(); + assertEquals("Expected one managed connection", 1, getManagedConnections().size()); + + + ManagedConnection managedConnection = getManagedConnections().get(0); + + managedConnection.closeConnection(); + + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + /* + try + { + + _connection.start(); + fail("Exception not thrown"); + } + catch (javax.jms.IllegalStateException ise) + { + ise.printStackTrace(); + // PASS + }*/ + + } + + + + public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception { assertEquals("Expected no managed connections", 0, getManagedConnections().size()); -- cgit v1.2.1