diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-19 15:24:27 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-19 15:24:27 +0000 |
| commit | db70f1d2908f294fee0ed47cdb478c3ab0f3b252 (patch) | |
| tree | bf08922f63b255a26182700f386bab4406db631d /qpid/java | |
| parent | c1926054f005af5084e46e6bf8da0c30120c82b4 (diff) | |
| download | qpid-python-db70f1d2908f294fee0ed47cdb478c3ab0f3b252.tar.gz | |
Connection close is now performed by i/o thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
23 files changed, 330 insertions, 81 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 883785c7ce..a24195075e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry AMQConnectionModel connection = itr.next(); try { - connection.close(AMQConstant.CONNECTION_FORCED, replyText); + connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText); } catch (Exception e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index b9a4b32acb..baf465f6d1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -490,16 +490,37 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) { - beforeClose(); - closeChildren(); - onClose(); - unregister(false); + CloseFuture close = beforeClose(); + + Runnable closeRunnable = new Runnable() + { + @Override + public void run() + { + closeChildren(); + onClose(); + unregister(false); + + } + }; + + if (close == null) + { + closeRunnable.run(); + } + else + { + close.runWhenComplete(closeRunnable); + } + + // if future not complete, schedule the remainder to be done once complete. } } - protected void beforeClose() + protected CloseFuture beforeClose() { + return null; } protected void onClose() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java new file mode 100644 index 0000000000..5e9d794e14 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.model; + + +public interface CloseFuture +{ + public void runWhenComplete(final Runnable closeRunnable); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 62f0e6ae06..64cfc39e1a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -31,6 +31,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.CloseFuture; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; @@ -51,6 +52,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Action _underlyingConnectionDeleteTask; private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false); private AMQConnectionModel _underlyingConnection; + private final AtomicBoolean _closing = new AtomicBoolean(); public ConnectionAdapter(final AMQConnectionModel conn) { @@ -158,15 +160,42 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) private void doDelete() { - closeUnderlyingConnection(); + asyncClose(); deleted(); setState(State.DELETED); } @Override + protected CloseFuture beforeClose() + { + _closing.set(true); + + final ConnectionCloseFuture closeFuture = asyncClose(); + + return closeFuture; + } + + private ConnectionCloseFuture asyncClose() + { + final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture(); + + _underlyingConnection.addDeleteTask(new Action() + { + @Override + public void performAction(final Object object) + { + LOGGER.debug("KWDEBUG underlying connection deleted"); + closeFuture.connectionClosed(); + } + }); + + _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + return closeFuture; + } + + @Override protected void onClose() { - closeUnderlyingConnection(); } @Override @@ -233,23 +262,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection // SessionAdapter installs delete task to cause session model object to delete } - private void closeUnderlyingConnection() + + private static class ConnectionCloseFuture implements CloseFuture { - if (_underlyingClosed.compareAndSet(false, true)) + private boolean _closed; + + public synchronized void connectionClosed() { - _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask); - try + _closed = true; + notifyAll(); + + } + + @Override + public void runWhenComplete(final Runnable closeRunnable) + { + if (_closed ) { - _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + closeRunnable.run(); } - catch (Exception e) + else { - LOGGER.warn("Exception closing connection " - + _underlyingConnection.getConnectionId() - + " from " - + _underlyingConnection.getRemoteAddressString(), e); - } + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + synchronized (ConnectionCloseFuture.this) + { + while (!_closed) + { + try + { + ConnectionCloseFuture.this.wait(); + } + catch (InterruptedException e) + { + } + } + + closeRunnable.run(); + } + } + }); + + t.setDaemon(true); + t.start(); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 353dcd98d6..50cfba5bec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -40,7 +40,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends * @param cause * @param message */ - public void close(AMQConstant cause, String message); + public void closeAsync(AMQConstant cause, String message); public void block(); @@ -110,4 +110,5 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends void notifyWork(); boolean isMessageAssignmentSuspended(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 759f5b8eb7..2ccf595c26 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -214,9 +214,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { - _delegate.processPendingMessages(); + _delegate.processPending(); } @Override @@ -260,7 +260,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { } @@ -418,7 +418,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } @Override - public void processPendingMessages() + public void processPending() { } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java index 7a0f43d74d..eba1f78ad0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java @@ -42,7 +42,7 @@ public interface ServerProtocolEngine extends ProtocolEngine boolean isMessageAssignmentSuspended(); - void processPendingMessages(); + void processPending(); boolean hasWork(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 65e8a1358d..60999fb2be 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -756,10 +756,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - protected void beforeClose() + protected org.apache.qpid.server.model.CloseFuture beforeClose() { _closing = true; - super.beforeClose(); + return super.beforeClose(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index 12ce46eedb..3a32ddd632 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -287,7 +287,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende _protocolEngine.setMessageAssignmentSuspended(true); - _protocolEngine.processPendingMessages(); + _protocolEngine.processPending(); _protocolEngine.setTransportBlockedForWriting(!doWrite()); boolean dataRead = doRead(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 4086a67aae..dce902b126 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -677,9 +677,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected void beforeClose() + protected CloseFuture beforeClose() { setState(State.UNAVAILABLE); + return null; } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index caba0bd1d8..1c7cf9d566 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -538,7 +538,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java index 65f9b4b148..e62a16fdec 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java @@ -55,7 +55,7 @@ public abstract class BaseConnectionActorTestCase extends BaseActorTestCase } if (_session != null) { - _session.close(AMQConstant.CONNECTION_FORCED, ""); + _session.closeAsync(AMQConstant.CONNECTION_FORCED, ""); } } finally diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index ba6b0d95f3..4485d5cc85 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -251,7 +251,7 @@ public class VirtualHostTest extends QpidTestCase 0, virtualHost.getChildren(Connection.class).size()); - verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); } public void testDeleteVirtualHost_ClosesConnections() @@ -276,7 +276,7 @@ public class VirtualHostTest extends QpidTestCase 0, virtualHost.getChildren(Connection.class).size()); - verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); } public void testCreateDurableQueue() diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 0d1fcb008a..10bf0e761e 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index caa8b90485..e9aa57f480 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT; +import static org.apache.qpid.transport.Connection.State.CLOSING; import java.net.SocketAddress; import java.security.Principal; @@ -30,6 +31,8 @@ import java.security.PrivilegedAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private boolean _blocking; private Transport _transport; - private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = + private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final Queue<Action<? super ServerConnection>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = new CopyOnWriteArrayList<SessionModelListener>(); @@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeSubscriptions(); - performDeleteTasks(); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + + addAsyncTask(new Action<ServerConnection>() { - // Ignore - } - close(replyCode, message); + @Override + public void performAction(final ServerConnection object) + { + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); + } + }); } protected void performDeleteTasks() { - for(Action<? super ServerConnection> task : _taskList) + for(Action<? super ServerConnection> task : _connectionCloseTaskList) { task.performAction(this); } @@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void addDeleteTask(final Action<? super ServerConnection> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); + } + + private void addAsyncTask(final Action<ServerConnection> action) + { + _asyncTaskList.add(action); + notifyWork(); } @Override public void removeDeleteTask(final Action<? super ServerConnection> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public int getMessageCompressionThreshold() @@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { return _serverProtocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super ServerConnection> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d33297fbf6..95d54579a7 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate } catch (VirtualHostUnavailableException e) { - getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage()); + getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage()); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c0cc7d55b0..8736bbeb3b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1714,7 +1714,7 @@ public class AMQChannel receivedLock.lock(); try { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } finally { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 88412c3b70..08411b8581 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -36,8 +36,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); - private final List<Action<? super AMQProtocolEngine>> _taskList = + private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = new CopyOnWriteArrayList<>(); + private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>(); private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); @@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, finally { _receivedLock.unlock(); + finishClose(connectionDropped); } @@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - for (Action<? super AMQProtocolEngine> task : _taskList) + for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList) { task.performAction(this); } @@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeSession(false); // currently performs the delete actions. } finally { try { writeFrame(frame); + + // add an async job and not } finally { @@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public void addDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.add(task); + _connectionCloseTaskList.add(task); } public void removeDeleteTask(Action<? super AMQProtocolEngine> task) { - _taskList.remove(task); + _connectionCloseTaskList.remove(task); } public ProtocolOutputConverter getProtocolOutputConverter() @@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine, writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) + public void closeAsync(final AMQConstant cause, final String message) { - closeConnection(0, new AMQConnectionException(cause, message, 0, 0, - getMethodRegistry(), - null)); + _logger.debug("KWDEBUG About to schedule close"); + + Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>() + { + @Override + public void performAction(final AMQProtocolEngine object) + { + _logger.debug("KWDEBUG About to perform close"); + closeConnection(0, new AMQConnectionException(cause, message, 0, 0, + getMethodRegistry(), + null)); + + } + }; + addAsyncTask(action); + } + + private void addAsyncTask(final Action<AMQProtocolEngine> action) + { + _asyncTaskList.add(action); + notifyWork(); } public void block() @@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } @Override - public void processPendingMessages() + public void processPending() { + while(_asyncTaskList.peek() != null) + { + Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + for (AMQSessionModel session : getSessionModels()) { session.processPendingMessages(); @@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _stateChanged.set(true); final Action<ServerProtocolEngine> listener = _workListener.get(); - _logger.info("Work lister is null? " + (listener == null)); if(listener != null) { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 097abe9d8b..cd4f269029 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -30,11 +30,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import javax.security.auth.Subject; +import org.apache.qpid.AMQConnectionException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.transport.Connection; public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0> { @@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private List<Action<? super Connection_1_0>> _closeTasks = Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>()); + + private final Queue<Action<? super Connection_1_0>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; @@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _closeTasks.add( task ); } + private void addAsyncTask(final Action<Connection_1_0> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + public void closeReceived() { Collection<Session_1_0> sessions = new ArrayList(_sessions); @@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod @Override - public void close(AMQConstant cause, String message) + public void closeAsync(AMQConstant cause, String message) { - _conn.close(); + Action<Connection_1_0> action = new Action<Connection_1_0>() + { + @Override + public void performAction(final Connection_1_0 object) + { + _conn.close(); + + } + }; + addAsyncTask(action); + } @Override @@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _protocolEngine.isMessageAssignmentSuspended(); } + + public void processPending() + { + while(_asyncTaskList.peek() != null) + { + Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } + + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 0078235990..a0f10eee65 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } @Override - public void processPendingMessages() + public void processPending() { - for (AMQSessionModel session : _connection.getSessionModels()) - { - session.processPendingMessages(); - } + _connection.processPending(); + } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e7b16362e2..7007948980 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -616,6 +616,12 @@ public class Connection extends ConnectionInvoker close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); } + + protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options) + { + connectionClose(replyCode, replyText, _options); + } + public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options) { synchronized (lock) diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java index a13bf71d5e..74b1f8a572 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java @@ -70,6 +70,9 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase } + + + public void testMessageListener() throws Exception { CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java index 67af3e17e4..ee9f2070d3 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java @@ -23,13 +23,7 @@ import java.util.Date; import java.util.Iterator; import java.util.List; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; +import javax.jms.*; import javax.management.JMException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; @@ -74,6 +68,38 @@ public class ConnectionManagementTest extends QpidBrokerTestCase } } + public void testManagementClosesConnection() throws Exception + { + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + _connection = getConnection(); + assertEquals("Expected one managed connection", 1, getManagedConnections().size()); + + + ManagedConnection managedConnection = getManagedConnections().get(0); + + managedConnection.closeConnection(); + + assertEquals("Expected no managed connections", 0, getManagedConnections().size()); + + /* + try + { + + _connection.start(); + fail("Exception not thrown"); + } + catch (javax.jms.IllegalStateException ise) + { + ise.printStackTrace(); + // PASS + }*/ + + } + + + + public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception { assertEquals("Expected no managed connections", 0, getManagedConnections().size()); |
