summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
committerKeith Wall <kwall@apache.org>2015-02-19 15:24:27 +0000
commitdb70f1d2908f294fee0ed47cdb478c3ab0f3b252 (patch)
treebf08922f63b255a26182700f386bab4406db631d /qpid/java
parentc1926054f005af5084e46e6bf8da0c30120c82b4 (diff)
downloadqpid-python-db70f1d2908f294fee0ed47cdb478c3ab0f3b252.tar.gz
Connection close is now performed by i/o thread
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1660909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java26
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java86
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java3
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java4
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java69
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java2
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java53
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java45
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java6
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java3
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java40
23 files changed, 330 insertions, 81 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 883785c7ce..a24195075e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -74,7 +74,7 @@ public class ConnectionRegistry implements IConnectionRegistry
AMQConnectionModel connection = itr.next();
try
{
- connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+ connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText);
}
catch (Exception e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index b9a4b32acb..baf465f6d1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -490,16 +490,37 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
{
- beforeClose();
- closeChildren();
- onClose();
- unregister(false);
+ CloseFuture close = beforeClose();
+
+ Runnable closeRunnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ closeChildren();
+ onClose();
+ unregister(false);
+
+ }
+ };
+
+ if (close == null)
+ {
+ closeRunnable.run();
+ }
+ else
+ {
+ close.runWhenComplete(closeRunnable);
+ }
+
+ // if future not complete, schedule the remainder to be done once complete.
}
}
- protected void beforeClose()
+ protected CloseFuture beforeClose()
{
+ return null;
}
protected void onClose()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
new file mode 100644
index 0000000000..5e9d794e14
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.model;
+
+
+public interface CloseFuture
+{
+ public void runWhenComplete(final Runnable closeRunnable);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 62f0e6ae06..64cfc39e1a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.CloseFuture;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
@@ -51,6 +52,7 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Action _underlyingConnectionDeleteTask;
private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
private AMQConnectionModel _underlyingConnection;
+ private final AtomicBoolean _closing = new AtomicBoolean();
public ConnectionAdapter(final AMQConnectionModel conn)
{
@@ -158,15 +160,42 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
private void doDelete()
{
- closeUnderlyingConnection();
+ asyncClose();
deleted();
setState(State.DELETED);
}
@Override
+ protected CloseFuture beforeClose()
+ {
+ _closing.set(true);
+
+ final ConnectionCloseFuture closeFuture = asyncClose();
+
+ return closeFuture;
+ }
+
+ private ConnectionCloseFuture asyncClose()
+ {
+ final ConnectionCloseFuture closeFuture = new ConnectionCloseFuture();
+
+ _underlyingConnection.addDeleteTask(new Action()
+ {
+ @Override
+ public void performAction(final Object object)
+ {
+ LOGGER.debug("KWDEBUG underlying connection deleted");
+ closeFuture.connectionClosed();
+ }
+ });
+
+ _underlyingConnection.closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ return closeFuture;
+ }
+
+ @Override
protected void onClose()
{
- closeUnderlyingConnection();
}
@Override
@@ -233,23 +262,54 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
// SessionAdapter installs delete task to cause session model object to delete
}
- private void closeUnderlyingConnection()
+
+ private static class ConnectionCloseFuture implements CloseFuture
{
- if (_underlyingClosed.compareAndSet(false, true))
+ private boolean _closed;
+
+ public synchronized void connectionClosed()
{
- _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
- try
+ _closed = true;
+ notifyAll();
+
+ }
+
+ @Override
+ public void runWhenComplete(final Runnable closeRunnable)
+ {
+ if (_closed )
{
- _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeRunnable.run();
}
- catch (Exception e)
+ else
{
- LOGGER.warn("Exception closing connection "
- + _underlyingConnection.getConnectionId()
- + " from "
- + _underlyingConnection.getRemoteAddressString(), e);
- }
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (ConnectionCloseFuture.this)
+ {
+ while (!_closed)
+ {
+ try
+ {
+ ConnectionCloseFuture.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ closeRunnable.run();
+ }
+ }
+ });
+
+ t.setDaemon(true);
+ t.start();
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 353dcd98d6..50cfba5bec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -40,7 +40,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
* @param cause
* @param message
*/
- public void close(AMQConstant cause, String message);
+ public void closeAsync(AMQConstant cause, String message);
public void block();
@@ -110,4 +110,5 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void notifyWork();
boolean isMessageAssignmentSuspended();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 759f5b8eb7..2ccf595c26 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -214,9 +214,9 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- _delegate.processPendingMessages();
+ _delegate.processPending();
}
@Override
@@ -260,7 +260,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
@@ -418,7 +418,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
index 7a0f43d74d..eba1f78ad0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
@@ -42,7 +42,7 @@ public interface ServerProtocolEngine extends ProtocolEngine
boolean isMessageAssignmentSuspended();
- void processPendingMessages();
+ void processPending();
boolean hasWork();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 65e8a1358d..60999fb2be 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -756,10 +756,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- protected void beforeClose()
+ protected org.apache.qpid.server.model.CloseFuture beforeClose()
{
_closing = true;
- super.beforeClose();
+ return super.beforeClose();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 12ce46eedb..3a32ddd632 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -287,7 +287,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
_protocolEngine.setMessageAssignmentSuspended(true);
- _protocolEngine.processPendingMessages();
+ _protocolEngine.processPending();
_protocolEngine.setTransportBlockedForWriting(!doWrite());
boolean dataRead = doRead();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 4086a67aae..dce902b126 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -677,9 +677,10 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected void beforeClose()
+ protected CloseFuture beforeClose()
{
setState(State.UNAVAILABLE);
+ return null;
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index caba0bd1d8..1c7cf9d566 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -538,7 +538,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
index 65f9b4b148..e62a16fdec 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
@@ -55,7 +55,7 @@ public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
}
if (_session != null)
{
- _session.close(AMQConstant.CONNECTION_FORCED, "");
+ _session.closeAsync(AMQConstant.CONNECTION_FORCED, "");
}
}
finally
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index ba6b0d95f3..4485d5cc85 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -251,7 +251,7 @@ public class VirtualHostTest extends QpidTestCase
0,
virtualHost.getChildren(Connection.class).size());
- verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
}
public void testDeleteVirtualHost_ClosesConnections()
@@ -276,7 +276,7 @@ public class VirtualHostTest extends QpidTestCase
0,
virtualHost.getChildren(Connection.class).size());
- verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ verify(connection).closeAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
}
public void testCreateDurableQueue()
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 0d1fcb008a..10bf0e761e 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -291,12 +291,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- for (AMQSessionModel session : _connection.getSessionModels())
- {
- session.processPendingMessages();
- }
+ _connection.processPending();
+
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index caa8b90485..e9aa57f480 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+import static org.apache.qpid.transport.Connection.State.CLOSING;
import java.net.SocketAddress;
import java.security.Principal;
@@ -30,6 +31,8 @@ import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -81,9 +84,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
private boolean _blocking;
private Transport _transport;
- private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+ private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+ private final Queue<Action<? super ServerConnection>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
new CopyOnWriteArrayList<SessionModelListener>();
@@ -368,25 +374,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeSubscriptions();
- performDeleteTasks();
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
+
+ addAsyncTask(new Action<ServerConnection>()
{
- // Ignore
- }
- close(replyCode, message);
+ @Override
+ public void performAction(final ServerConnection object)
+ {
+ closeSubscriptions();
+ performDeleteTasks();
+
+ setState(CLOSING);
+ ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
+ try
+ {
+ replyCode = ConnectionCloseCode.get(cause.getCode());
+ }
+ catch (IllegalArgumentException iae)
+ {
+ // Ignore
+ }
+ sendConnectionClose(replyCode, message);
+ }
+ });
}
protected void performDeleteTasks()
{
- for(Action<? super ServerConnection> task : _taskList)
+ for(Action<? super ServerConnection> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -659,13 +675,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
@Override
public void addDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
+ }
+
+ private void addAsyncTask(final Action<ServerConnection> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
@Override
public void removeDeleteTask(final Action<? super ServerConnection> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public int getMessageCompressionThreshold()
@@ -698,4 +720,19 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
{
return _serverProtocolEngine.isMessageAssignmentSuspended();
}
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index d33297fbf6..95d54579a7 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -439,7 +439,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
catch (VirtualHostUnavailableException e)
{
- getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
+ getServerConnection(serverSession).closeAsync(AMQConstant.CONNECTION_FORCED, e.getMessage());
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c0cc7d55b0..8736bbeb3b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1714,7 +1714,7 @@ public class AMQChannel
receivedLock.lock();
try
{
- _connection.close(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason);
}
finally
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 88412c3b70..08411b8581 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -36,8 +36,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -164,9 +166,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
- private final List<Action<? super AMQProtocolEngine>> _taskList =
+ private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList =
new CopyOnWriteArrayList<>();
+ private final Queue<Action<? super AMQProtocolEngine>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
private ProtocolOutputConverter _protocolOutputConverter;
private final Subject _authorizedSubject = new Subject();
@@ -849,6 +854,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
finally
{
_receivedLock.unlock();
+
finishClose(connectionDropped);
}
@@ -890,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
{
try
{
- for (Action<? super AMQProtocolEngine> task : _taskList)
+ for (Action<? super AMQProtocolEngine> task : _connectionCloseTaskList)
{
task.performAction(this);
}
@@ -975,13 +981,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
try
{
markChannelAwaitingCloseOk(channelId);
- closeSession(false);
+ closeSession(false); // currently performs the delete actions.
}
finally
{
try
{
writeFrame(frame);
+
+ // add an async job and not
}
finally
{
@@ -1133,12 +1141,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void addDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.add(task);
+ _connectionCloseTaskList.add(task);
}
public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
{
- _taskList.remove(task);
+ _connectionCloseTaskList.remove(task);
}
public ProtocolOutputConverter getProtocolOutputConverter()
@@ -1388,11 +1396,29 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message)
+ public void closeAsync(final AMQConstant cause, final String message)
{
- closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
- getMethodRegistry(),
- null));
+ _logger.debug("KWDEBUG About to schedule close");
+
+ Action<AMQProtocolEngine> action = new Action<AMQProtocolEngine>()
+ {
+ @Override
+ public void performAction(final AMQProtocolEngine object)
+ {
+ _logger.debug("KWDEBUG About to perform close");
+ closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
+ getMethodRegistry(),
+ null));
+
+ }
+ };
+ addAsyncTask(action);
+ }
+
+ private void addAsyncTask(final Action<AMQProtocolEngine> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
}
public void block()
@@ -2080,8 +2106,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
for (AMQSessionModel session : getSessionModels())
{
session.processPendingMessages();
@@ -2100,7 +2132,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_stateChanged.set(true);
final Action<ServerProtocolEngine> listener = _workListener.get();
- _logger.info("Work lister is null? " + (listener == null));
if(listener != null)
{
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 097abe9d8b..cd4f269029 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -30,11 +30,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.security.auth.Subject;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -58,6 +61,7 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.Connection;
public class Connection_1_0 implements ConnectionEventListener, AMQConnectionModel<Connection_1_0,Session_1_0>
{
@@ -100,6 +104,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
private List<Action<? super Connection_1_0>> _closeTasks =
Collections.synchronizedList(new ArrayList<Action<? super Connection_1_0>>());
+
+ private final Queue<Action<? super Connection_1_0>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
+
private boolean _closedOnOpen;
@@ -213,6 +222,13 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
_closeTasks.add( task );
}
+ private void addAsyncTask(final Action<Connection_1_0> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
+ }
+
+
public void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList(_sessions);
@@ -251,9 +267,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
@Override
- public void close(AMQConstant cause, String message)
+ public void closeAsync(AMQConstant cause, String message)
{
- _conn.close();
+ Action<Connection_1_0> action = new Action<Connection_1_0>()
+ {
+ @Override
+ public void performAction(final Connection_1_0 object)
+ {
+ _conn.close();
+
+ }
+ };
+ addAsyncTask(action);
+
}
@Override
@@ -510,4 +536,19 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
return _protocolEngine.isMessageAssignmentSuspended();
}
+
+ public void processPending()
+ {
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
+
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 0078235990..a0f10eee65 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -620,12 +620,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
- for (AMQSessionModel session : _connection.getSessionModels())
- {
- session.processPendingMessages();
- }
+ _connection.processPending();
+
}
@Override
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e7b16362e2..7007948980 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -616,6 +616,12 @@ public class Connection extends ConnectionInvoker
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
+
+ protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options)
+ {
+ connectionClose(replyCode, replyText, _options);
+ }
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java
index a13bf71d5e..74b1f8a572 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java
@@ -70,6 +70,9 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase
}
+
+
+
public void testMessageListener() throws Exception
{
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java
index 67af3e17e4..ee9f2070d3 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java
@@ -23,13 +23,7 @@ import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
+import javax.jms.*;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@@ -74,6 +68,38 @@ public class ConnectionManagementTest extends QpidBrokerTestCase
}
}
+ public void testManagementClosesConnection() throws Exception
+ {
+ assertEquals("Expected no managed connections", 0, getManagedConnections().size());
+
+ _connection = getConnection();
+ assertEquals("Expected one managed connection", 1, getManagedConnections().size());
+
+
+ ManagedConnection managedConnection = getManagedConnections().get(0);
+
+ managedConnection.closeConnection();
+
+ assertEquals("Expected no managed connections", 0, getManagedConnections().size());
+
+ /*
+ try
+ {
+
+ _connection.start();
+ fail("Exception not thrown");
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ ise.printStackTrace();
+ // PASS
+ }*/
+
+ }
+
+
+
+
public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception
{
assertEquals("Expected no managed connections", 0, getManagedConnections().size());