diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-03 14:15:30 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-03 14:15:30 +0000 |
| commit | fe37626d4fd8fb3ee5b3146a5159024a3d6d3357 (patch) | |
| tree | 237c932ce0db01a0aa3b06fac9c6a06e0b4ed1ee /qpid/java | |
| parent | 6001ef6840f99b090fe5736921164d104c519b13 (diff) | |
| download | qpid-python-fe37626d4fd8fb3ee5b3146a5159024a3d6d3357.tar.gz | |
channel block/unblock now async, remove unnecessary selector bumps
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663708 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
12 files changed, 73 insertions, 104 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index d488ccc138..3731ae262b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -116,5 +116,5 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo void transportStateChanged(); - void processPendingMessages(); + void processPending(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index 3a32ddd632..9070d75e69 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -623,6 +623,8 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende @Override public void send(final ByteBuffer msg) { + assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName(); + if (_closed.get()) { throw new SenderClosedException("I/O for thread " + _remoteSocketAddress + " is already closed"); @@ -634,7 +636,5 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende @Override public void flush() { - getSelector().wakeup(); - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index cc5ee71cf5..8d35e1a94f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -36,11 +36,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -/** -* Created by keith on 28/01/2015. -*/ public class SelectorThread extends Thread { + public static final String IO_THREAD_NAME_PREFIX = "NCS-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); @@ -289,7 +287,8 @@ public class SelectorThread extends Thread String currentName = Thread.currentThread().getName(); try { - Thread.currentThread().setName("NCS-"+connection.getRemoteAddress().toString()); + Thread.currentThread().setName( + IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); processConnection(connection); } finally diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index eafc969496..ca440bc432 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -488,7 +488,7 @@ public class MockConsumer implements ConsumerTarget } @Override - public void processPendingMessages() + public void processPending() { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index a540318452..5affe3019c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -632,7 +632,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC public void flushBatched() { - _session.getConnection().flush(); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 0c2fc46a11..a6ed8c452a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -70,7 +70,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { private final Broker<?> _broker; - private Runnable _onOpenTask; private AtomicBoolean _logClosed = new AtomicBoolean(false); private final Subject _authorizedSubject = new Subject(); @@ -79,10 +78,10 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final long _connectionId; private final Object _reference = new Object(); private VirtualHostImpl<?,?,?> _virtualHost; - private AmqpPort<?> _port; - private AtomicLong _lastIoTime = new AtomicLong(); + private final AmqpPort<?> _port; + private final AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; - private Transport _transport; + private final Transport _transport; private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); @@ -95,7 +94,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private volatile boolean _stopped; private int _messageCompressionThreshold; - private int _maxMessageSize; + private final int _maxMessageSize; private ServerProtocolEngine _serverProtocolEngine; @@ -149,10 +148,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S if (state == State.OPEN) { - if (_onOpenTask != null) - { - _onOpenTask.run(); - } getEventLogger().message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), @@ -256,11 +251,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _stopped; } - public void onOpen(final Runnable task) - { - _onOpenTask = task; - } - public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message) { addAsyncTask(new Action<ServerConnection>() @@ -740,7 +730,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 17cf8e7101..67204427fb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -137,6 +137,7 @@ public class ServerSession extends Session private org.apache.qpid.server.model.Session<?> _modelObject; private long _blockTime; private long _blockingTimeout; + private boolean _wireBlockingState; public static interface MessageDispositionChangeListener { @@ -208,10 +209,6 @@ public class ServerSession extends Session if (state == State.OPEN) { getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); - if(_blocking.get()) - { - invokeBlock(); - } } } else @@ -245,6 +242,17 @@ public class ServerSession extends Session invoke(new MessageStop("")); } + private void invokeUnblock() + { + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } + + @Override protected boolean isFull(int id) { @@ -824,12 +832,11 @@ public class ServerSession extends Session if(_blocking.compareAndSet(false,true)) { + getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); if(getState() == State.OPEN) { - invokeBlock(); + getConnection().notifyWork(); } - _blockTime = System.currentTimeMillis(); - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -853,24 +860,17 @@ public class ServerSession extends Session { if(_blocking.compareAndSet(true,false) && !isClosing()) { - _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); - - + getConnection().notifyWork(); } } } + boolean blockingTimeoutExceeded() { long blockTime = _blockTime; - boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; return b; } @@ -1136,8 +1136,25 @@ public class ServerSession extends Session } @Override - public void processPendingMessages() + public void processPending() { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + + if (desiredBlockingState) + { + invokeBlock(); + } + else + { + invokeUnblock(); + } + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + for(ConsumerTarget target : getSubscriptions()) { target.processPending(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index d7e5857924..9631530f90 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -209,6 +209,8 @@ public class AMQChannel private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); private long _maxUncommittedInMemorySize; + private boolean _wireBlockingState; + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { _creditManager = new Pre0_10CreditManager(0l,0l, connection); @@ -1611,12 +1613,14 @@ public class AMQChannel { if(_blockingEntities.add(this)) { + if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); - flow(false); - _blockTime = System.currentTimeMillis(); + + + getConnection().notifyWork(); } } } @@ -1628,8 +1632,7 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - - flow(true); + getConnection().notifyWork(); } } } @@ -1643,8 +1646,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); - flow(false); - _blockTime = System.currentTimeMillis(); + getConnection().notifyWork(); } } @@ -1657,7 +1659,7 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - flow(true); + getConnection().notifyWork(); } } } @@ -2262,7 +2264,7 @@ public class AMQChannel private boolean blockingTimeoutExceeded() { - return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; } @Override @@ -3598,9 +3600,17 @@ public class AMQChannel } @Override - public void processPendingMessages() + public void processPending() { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + flow(!desiredBlockingState); + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) { target.processPending(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cb65424b67..659207d9e8 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -44,8 +44,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.security.auth.Subject; import javax.security.sasl.SaslException; @@ -148,11 +146,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, * The channels that the latest call to {@link #received(ByteBuffer)} applied to. * Used so we know which channels we need to call {@link AMQChannel#receivedComplete()} * on after handling the frames. - * - * Thread-safety: guarded by {@link #_receivedLock}. */ - private final Set<AMQChannel> _channelsForCurrentMessage = - new HashSet<>(); + private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>(); private AMQDecoder _decoder; @@ -197,7 +192,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? private boolean _blocking; - private final ReentrantLock _receivedLock; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); private final Broker<?> _broker; private final Transport _transport; @@ -251,7 +245,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _port = port; _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); - _receivedLock = new ReentrantLock(); _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); @@ -545,43 +538,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes); private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - private ByteBuffer asByteBuffer(AMQDataBlock block) - { - final int size = (int) block.getSize(); - - final byte[] data; - - - if(size > REUSABLE_BYTE_BUFFER_CAPACITY) - { - data= new byte[size]; - } - else - { - - data = _reusableBytes; - } - _reusableDataOutput.setBuffer(data); - - try - { - block.writePayload(_reusableDataOutput); - } - catch (IOException e) - { - throw new ServerScopedRuntimeException(e); - } - - final ByteBuffer copy = ByteBuffer.allocate(_reusableDataOutput.length()); - copy.put(data, 0, _reusableDataOutput.length()); - copy.flip(); - return copy; - } - - /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). @@ -1969,11 +1927,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _reference; } - public Lock getReceivedLock() - { - return _receivedLock; - } - @Override public long getLastReadTime() { @@ -2095,6 +2048,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, @Override public void processPending() { + + while(_asyncTaskList.peek() != null) { Action<? super AMQProtocolEngine> asyncAction = _asyncTaskList.poll(); @@ -2103,7 +2058,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 2bf2fc6d27..a2113de8ea 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -515,6 +515,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen if (isAutoClose()) { _needToClose.set(true); + getChannel().getConnection().notifyWork(); } } @@ -531,8 +532,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen public void flushBatched() { _channel.getConnection().setDeferFlush(false); - - _channel.getConnection().notifyWork(); } protected void addUnacknowledgedMessage(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 77932fa680..b515fda4a7 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -552,7 +552,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod for (AMQSessionModel session : getSessionModels()) { - session.processPendingMessages(); + session.processPending(); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 0a29a70373..2a49e812f5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -901,7 +901,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override - public void processPendingMessages() + public void processPending() { for(Consumer<?> consumer : getConsumers()) { |
