summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-10 23:38:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-10 23:38:19 +0000
commitb9a7752b59c48e5177e069c6cd4461f3d3678685 (patch)
tree6866771d251197ed0fe4d2d51b2d1bbca7cae1f9 /qpid/java
parentf5ee46517eb096030a6c44b14b801eb2aaeb9392 (diff)
downloadqpid-python-b9a7752b59c48e5177e069c6cd4461f3d3678685.tar.gz
Force the queue runner to notice that the consumer is not now suspended
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java12
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java1
7 files changed, 46 insertions, 1 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index cad7b71fdd..a9f361d85c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -120,6 +120,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
}
}
+ @Override
public final void notifyCurrentState()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index b2e8cec315..32b12d2a44 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -46,6 +46,8 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
+ void notifyCurrentState();
+
void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 8b424d2c9e..2939f930bf 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -217,6 +217,12 @@ public class MockConsumer implements ConsumerTarget
close();
}
+ @Override
+ public void notifyCurrentState()
+ {
+
+ }
+
public void setState(State state)
{
State oldState = _state;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 3e8ba7cfab..75a162deb8 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -30,7 +30,9 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.transport.ByteBufferSender;
@@ -80,6 +82,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended = messageAssignmentSuspended;
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 1b69edb50e..945e18e560 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -69,6 +69,7 @@ import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
@@ -215,6 +216,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended = messageAssignmentSuspended;
+
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 57f070804a..22cf76a3ea 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -53,7 +53,9 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -165,6 +167,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended = messageAssignmentSuspended;
+
+ if(!messageAssignmentSuspended)
+ {
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ {
+ for(Consumer<?> consumer : session.getConsumers())
+ {
+ ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ }
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
index 2f3ba81285..7c7c6929d1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -50,7 +50,6 @@ public class NonBlockingConnection implements NetworkConnection
private Principal _principal;
private boolean _principalChecked;
private final Object _lock = new Object();
- private boolean _stateChanged;
public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,