summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
committerKeith Wall <kwall@apache.org>2015-02-10 18:10:16 +0000
commitf5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch)
tree25544486642cc770061489663dba650d85769404 /qpid/java/broker-plugins/amqp-0-10-protocol
parent085486ebe5ff21133b9caf1c31625ac6ea356568 (diff)
downloadqpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java13
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java9
4 files changed, 50 insertions, 3 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index afa4fb8bc0..209f6663ec 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
@@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
- return size;
}
void recordUnacknowledged(MessageInstance entry)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 401c6fc939..3e8ba7cfab 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.network.Assembler;
@@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime = _createTime;
private volatile boolean _transportBlockedForWriting;
+ private volatile boolean _messageAssignmentSuspended;
+
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
{
@@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
}
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _messageAssignmentSuspended;
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended = messageAssignmentSuspended;
+ }
+
+
+
public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender)
{
if(!getSubject().equals(Subject.getSubject(AccessController.getContext())))
@@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
_connection.transportStateChanged();
}
+ @Override
+ public void processPendingMessages()
+ {
+ for (AMQSessionModel session : _connection.getSessionModels())
+ {
+ session.processPendingMessages();
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index cbd569d036..d9b4495d6e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -685,4 +685,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
ssn.transportStateChanged();
}
}
+
+ @Override
+ public void flushBatched()
+ {
+ getSender().flush();
+ }
+
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ return _serverProtocolEngine.isMessageAssignmentSuspended();
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 1d8676edd6..3659d6ce01 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1135,6 +1135,15 @@ public class ServerSession extends Session
}
}
+ @Override
+ public void processPendingMessages()
+ {
+ for(ConsumerTarget target : getSubscriptions())
+ {
+ target.processPendingMessages();
+ }
+ }
+
public final long getMaxUncommittedInMemorySize()
{