diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-12 15:18:16 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-12 15:18:16 +0000 |
| commit | e243745d439671418016a2be1570209269b45070 (patch) | |
| tree | 7f17111b385b7759869e2e5594e591979097cc6f /qpid/java | |
| parent | 825aceb7e885c793309557a3a886f10c475c4c1c (diff) | |
| download | qpid-python-e243745d439671418016a2be1570209269b45070.tar.gz | |
0-10 queue browser fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 29 insertions, 11 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 4fb89575aa..450d4d98d5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -316,6 +316,7 @@ class QueueConsumerImpl public final void flush() { _queue.flushConsumer(this); + _target.processPending(); } public boolean resend(final QueueEntry entry) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index 786f1915a7..cc5ee71cf5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -286,7 +286,16 @@ public class SelectorThread extends Thread @Override public void run() { - processConnection(connection); + String currentName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString()); + processConnection(connection); + } + finally + { + Thread.currentThread().setName(currentName); + } } }); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 5f99ddf56c..0d1fcb008a 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -61,7 +61,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); + private final AtomicBoolean _stateChanged = new AtomicBoolean(); private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>(); @@ -81,13 +82,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); + if(!messageAssignmentSuspended) { for(AMQSessionModel<?,?> session : _connection.getSessionModels()) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index f8ca3077b4..88412c3b70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -209,19 +209,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private long _maxMessageSize; private volatile boolean _transportBlockedForWriting; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); if(!messageAssignmentSuspended) { for(AMQSessionModel<?,?> session : getSessionModels()) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 3d7712d1ad..0078235990 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; - private volatile boolean _messageAssignmentSuspended; + private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>(); @@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut @Override public boolean isMessageAssignmentSuspended() { - return _messageAssignmentSuspended; + Thread lock = _messageAssignmentSuspended.get(); + return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread(); } @Override public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) { - _messageAssignmentSuspended = messageAssignmentSuspended; + _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null); if(!messageAssignmentSuspended) { diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java index 72dea9b18b..5357f4bcce 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java @@ -290,7 +290,8 @@ public class SSLTest extends QpidBrokerTestCase ByteArrayOutputStream bout = new ByteArrayOutputStream(); e.printStackTrace(new PrintStream(bout)); String strace = bout.toString(); - assertTrue("Correct exception not thrown", strace.contains(expectedString)); + assertTrue("Correct exception not thrown, expecting : " + expectedString + " got : " +e, + strace.contains(expectedString)); } public void testVerifyLocalHost() throws Exception diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index 8a4e22783f..cf28895799 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -147,6 +147,8 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth); + getLogger().debug("KWDEBUG : About to check queue depth using browser"); + // Browse the queue to get a second opinion int msgCount = 0; |
