diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
| commit | f5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch) | |
| tree | 25544486642cc770061489663dba650d85769404 /qpid/java/broker-plugins/amqp-0-10-protocol | |
| parent | 085486ebe5ff21133b9caf1c31625ac6ea356568 (diff) | |
| download | qpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz | |
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol')
4 files changed, 50 insertions, 3 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index afa4fb8bc0..209f6663ec 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } @@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 401c6fc939..3e8ba7cfab 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.network.Assembler; @@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { @@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) @@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _connection.transportStateChanged(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index cbd569d036..d9b4495d6e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -685,4 +685,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S ssn.transportStateChanged(); } } + + @Override + public void flushBatched() + { + getSender().flush(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 1d8676edd6..3659d6ce01 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1135,6 +1135,15 @@ public class ServerSession extends Session } } + @Override + public void processPendingMessages() + { + for(ConsumerTarget target : getSubscriptions()) + { + target.processPendingMessages(); + } + } + public final long getMaxUncommittedInMemorySize() { |
