diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:40:54 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-11 15:40:54 +0000 |
| commit | f75a0292a343f2d07b1b413486121999540fd64a (patch) | |
| tree | fa20a786f244fec063cb10c50a6159b755028d3d /qpid/java | |
| parent | e358bfc6da7aac88b4785be5d548acff79802ee7 (diff) | |
| download | qpid-python-f75a0292a343f2d07b1b413486121999540fd64a.tar.gz | |
More refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658990 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
12 files changed, 141 insertions, 20 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index a9f361d85c..6ccdec3436 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -161,7 +161,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget { _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch)); - getSessionModel().getConnectionModel().flushBatched(); + getSessionModel().getConnectionModel().notifyWork(); return entry.getMessage().getSize(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 96900d9a5a..353dcd98d6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -107,7 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends void removeSessionListener(SessionModelListener listener); - void flushBatched(); + void notifyWork(); boolean isMessageAssignmentSuspended(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 3b7883b9b9..5c575006cf 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -215,6 +215,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine _delegate.processPendingMessages(); } + @Override + public boolean hasWork() + { + return _delegate.hasWork(); + } + + @Override + public void notifyWork() + { + _delegate.notifyWork(); + } + + @Override + public void clearWork() + { + _delegate.clearWork(); + } + private class ClosedDelegateProtocolEngine implements ServerProtocolEngine { @@ -236,6 +254,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public boolean hasWork() + { + return false; + } + + @Override + public void notifyWork() + { + + } + + @Override + public void clearWork() + { + + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -369,6 +405,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine } + @Override + public boolean hasWork() + { + return false; + } + + @Override + public void notifyWork() + { + + } + + @Override + public void clearWork() + { + + } + public void received(ByteBuffer msg) { _lastReadTime = System.currentTimeMillis(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java index 529c493655..3a2abd45e7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java @@ -42,4 +42,10 @@ public interface ServerProtocolEngine extends ProtocolEngine boolean isMessageAssignmentSuspended(); void processPendingMessages(); + + boolean hasWork(); + + void clearWork(); + + void notifyWork(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index 331c2e697d..dfa6b49e9b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -85,7 +85,6 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende private TransportEncryption _transportEncryption; private SSLEngineResult _status; private volatile boolean _fullyWritten = true; - private AtomicBoolean _stateChanged = new AtomicBoolean(); private boolean _workDone; @@ -180,7 +179,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende LOGGER.debug("Closing " + _remoteSocketAddress); if(_closed.compareAndSet(false,true)) { - _stateChanged.set(true); + _protocolEngine.notifyWork(); getSelector().wakeup(); } } @@ -256,12 +255,12 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende public boolean isStateChanged() { - return _stateChanged.get(); + return _protocolEngine.hasWork(); } public boolean doWork() { - _stateChanged.set(false); + _protocolEngine.clearWork(); boolean closed = _closed.get(); if (!closed) { @@ -287,7 +286,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0)) { - _stateChanged.set(true); + _protocolEngine.notifyWork(); } // tell all consumer targets that it is okay to accept more @@ -299,7 +298,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende LOGGER.debug("Closing " + _remoteSocketAddress); if(_closed.compareAndSet(false,true)) { - _stateChanged.set(true); + _protocolEngine.notifyWork(); getSelector().wakeup(); } } @@ -621,13 +620,12 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende } // append to list and do selector wakeup _buffers.add(msg); - _stateChanged.set(true); + _protocolEngine.notifyWork(); } @Override public void flush() { - _stateChanged.set(true); getSelector().wakeup(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 2939f930bf..53f1632933 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -624,7 +624,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void flushBatched() + public void notifyWork() { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 5f227e5f18..e521754edf 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -24,6 +24,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -59,6 +60,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private volatile boolean _transportBlockedForWriting; private volatile boolean _messageAssignmentSuspended; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) @@ -289,4 +291,22 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol session.processPendingMessages(); } } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index a2f1f1a4ba..dce656e7fd 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -687,8 +687,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override - public void flushBatched() + public void notifyWork() { + _serverProtocolEngine.notifyWork(); + + // TODO getSender().flush(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index ed075038e6..b1547f13e2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -97,6 +97,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, ServerMethodProcessor<ServerChannelMethodProcessor> { + enum ConnectionState { INIT, @@ -118,6 +119,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); private AMQShortString _contextKey; @@ -339,10 +341,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } - public synchronized void flushBatched() - { - _sender.flush(); - } public ClientDeliveryMethod createDeliveryMethod(int channelId) @@ -2085,4 +2083,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine, session.processPendingMessages(); } } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + + // TODO + _sender.flush(); + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index d33a4aafd8..f9560fa0d2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -34,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.consumer.ConsumerMessageInstancePair; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -524,7 +522,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { _channel.getConnection().setDeferFlush(false); - _channel.getConnection().flushBatched(); + _channel.getConnection().notifyWork(); } protected void addUnacknowledgedMessage(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index a442b5c437..09250ea6ac 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -500,8 +500,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public void flushBatched() + public void notifyWork() { + _protocolEngine.notifyWork(); + + // TODO _protocolEngine.flushBatched(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index d361dce682..856aa14947 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -29,6 +29,7 @@ import java.security.PrivilegedAction; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -82,6 +83,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private long _createTime = System.currentTimeMillis(); private ConnectionEndpoint _endpoint; private long _connectionId; + private final AtomicBoolean _stateChanged = new AtomicBoolean(); private static final ByteBuffer HEADER = ByteBuffer.wrap(new byte[] @@ -620,4 +622,22 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut session.processPendingMessages(); } } + + @Override + public boolean hasWork() + { + return _stateChanged.get(); + } + + @Override + public void notifyWork() + { + _stateChanged.set(true); + } + + @Override + public void clearWork() + { + _stateChanged.set(false); + } } |
