summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-12 15:18:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-12 15:18:16 +0000
commite243745d439671418016a2be1570209269b45070 (patch)
tree7f17111b385b7759869e2e5594e591979097cc6f /qpid/java
parent825aceb7e885c793309557a3a886f10c475c4c1c (diff)
downloadqpid-python-e243745d439671418016a2be1570209269b45070.tar.gz
0-10 queue browser fix.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1659288 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java11
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java9
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java7
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java7
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java3
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java2
7 files changed, 29 insertions, 11 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 4fb89575aa..450d4d98d5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -316,6 +316,7 @@ class QueueConsumerImpl
public final void flush()
{
_queue.flushConsumer(this);
+ _target.processPending();
}
public boolean resend(final QueueEntry entry)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index 786f1915a7..cc5ee71cf5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -286,7 +286,16 @@ public class SelectorThread extends Thread
@Override
public void run()
{
- processConnection(connection);
+ String currentName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString());
+ processConnection(connection);
+ }
+ finally
+ {
+ Thread.currentThread().setName(currentName);
+ }
}
});
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5f99ddf56c..0d1fcb008a 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -61,7 +61,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ServerProtocolEngine>> _workListener = new AtomicReference<>();
@@ -81,13 +82,15 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : _connection.getSessionModels())
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index f8ca3077b4..88412c3b70 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -209,19 +209,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private long _maxMessageSize;
private volatile boolean _transportBlockedForWriting;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
if(!messageAssignmentSuspended)
{
for(AMQSessionModel<?,?> session : getSessionModels())
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 3d7712d1ad..0078235990 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -144,7 +144,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private State _state = State.A;
- private volatile boolean _messageAssignmentSuspended;
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
@@ -166,13 +166,14 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
@Override
public boolean isMessageAssignmentSuspended()
{
- return _messageAssignmentSuspended;
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
}
@Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
- _messageAssignmentSuspended = messageAssignmentSuspended;
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
if(!messageAssignmentSuspended)
{
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
index 72dea9b18b..5357f4bcce 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -290,7 +290,8 @@ public class SSLTest extends QpidBrokerTestCase
ByteArrayOutputStream bout = new ByteArrayOutputStream();
e.printStackTrace(new PrintStream(bout));
String strace = bout.toString();
- assertTrue("Correct exception not thrown", strace.contains(expectedString));
+ assertTrue("Correct exception not thrown, expecting : " + expectedString + " got : " +e,
+ strace.contains(expectedString));
}
public void testVerifyLocalHost() throws Exception
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index 8a4e22783f..cf28895799 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -147,6 +147,8 @@ public class QueueBrowserAutoAckTest extends QpidBrokerTestCase
assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth);
+ getLogger().debug("KWDEBUG : About to check queue depth using browser");
+
// Browse the queue to get a second opinion
int msgCount = 0;