summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:40:54 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-11 15:40:54 +0000
commitf75a0292a343f2d07b1b413486121999540fd64a (patch)
treefa20a786f244fec063cb10c50a6159b755028d3d /qpid/java
parente358bfc6da7aac88b4785be5d548acff79802ee7 (diff)
downloadqpid-python-f75a0292a343f2d07b1b413486121999540fd64a.tar.gz
More refactoring
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658990 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java54
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java14
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java20
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java27
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java20
12 files changed, 141 insertions, 20 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index a9f361d85c..6ccdec3436 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -161,7 +161,7 @@ public abstract class AbstractConsumerTarget implements ConsumerTarget
{
_queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
- getSessionModel().getConnectionModel().flushBatched();
+ getSessionModel().getConnectionModel().notifyWork();
return entry.getMessage().getSize();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 96900d9a5a..353dcd98d6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -107,7 +107,7 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
void removeSessionListener(SessionModelListener listener);
- void flushBatched();
+ void notifyWork();
boolean isMessageAssignmentSuspended();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 3b7883b9b9..5c575006cf 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -215,6 +215,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_delegate.processPendingMessages();
}
+ @Override
+ public boolean hasWork()
+ {
+ return _delegate.hasWork();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _delegate.notifyWork();
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _delegate.clearWork();
+ }
+
private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
{
@@ -236,6 +254,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ @Override
+ public boolean hasWork()
+ {
+ return false;
+ }
+
+ @Override
+ public void notifyWork()
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -369,6 +405,24 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ @Override
+ public boolean hasWork()
+ {
+ return false;
+ }
+
+ @Override
+ public void notifyWork()
+ {
+
+ }
+
+ @Override
+ public void clearWork()
+ {
+
+ }
+
public void received(ByteBuffer msg)
{
_lastReadTime = System.currentTimeMillis();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
index 529c493655..3a2abd45e7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
@@ -42,4 +42,10 @@ public interface ServerProtocolEngine extends ProtocolEngine
boolean isMessageAssignmentSuspended();
void processPendingMessages();
+
+ boolean hasWork();
+
+ void clearWork();
+
+ void notifyWork();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 331c2e697d..dfa6b49e9b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -85,7 +85,6 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
private TransportEncryption _transportEncryption;
private SSLEngineResult _status;
private volatile boolean _fullyWritten = true;
- private AtomicBoolean _stateChanged = new AtomicBoolean();
private boolean _workDone;
@@ -180,7 +179,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
LOGGER.debug("Closing " + _remoteSocketAddress);
if(_closed.compareAndSet(false,true))
{
- _stateChanged.set(true);
+ _protocolEngine.notifyWork();
getSelector().wakeup();
}
}
@@ -256,12 +255,12 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
public boolean isStateChanged()
{
- return _stateChanged.get();
+ return _protocolEngine.hasWork();
}
public boolean doWork()
{
- _stateChanged.set(false);
+ _protocolEngine.clearWork();
boolean closed = _closed.get();
if (!closed)
{
@@ -287,7 +286,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
if(dataRead || (_workDone && _netInputBuffer != null && _netInputBuffer.position() != 0))
{
- _stateChanged.set(true);
+ _protocolEngine.notifyWork();
}
// tell all consumer targets that it is okay to accept more
@@ -299,7 +298,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
LOGGER.debug("Closing " + _remoteSocketAddress);
if(_closed.compareAndSet(false,true))
{
- _stateChanged.set(true);
+ _protocolEngine.notifyWork();
getSelector().wakeup();
}
}
@@ -621,13 +620,12 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
}
// append to list and do selector wakeup
_buffers.add(msg);
- _stateChanged.set(true);
+ _protocolEngine.notifyWork();
}
@Override
public void flush()
{
- _stateChanged.set(true);
getSelector().wakeup();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 2939f930bf..53f1632933 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -624,7 +624,7 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public void flushBatched()
+ public void notifyWork()
{
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5f227e5f18..e521754edf 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -24,6 +24,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
@@ -59,6 +60,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private volatile boolean _transportBlockedForWriting;
private volatile boolean _messageAssignmentSuspended;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network)
@@ -289,4 +291,22 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
session.processPendingMessages();
}
}
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index a2f1f1a4ba..dce656e7fd 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -687,8 +687,11 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
}
@Override
- public void flushBatched()
+ public void notifyWork()
{
+ _serverProtocolEngine.notifyWork();
+
+ // TODO
getSender().flush();
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index ed075038e6..b1547f13e2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -97,6 +97,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
ServerMethodProcessor<ServerChannelMethodProcessor>
{
+
enum ConnectionState
{
INIT,
@@ -118,6 +119,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
private static final long AWAIT_CLOSED_TIMEOUT = 60000;
private final AmqpPort<?> _port;
private final long _creationTime;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
private AMQShortString _contextKey;
@@ -339,10 +341,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
return _closing.get();
}
- public synchronized void flushBatched()
- {
- _sender.flush();
- }
public ClientDeliveryMethod createDeliveryMethod(int channelId)
@@ -2085,4 +2083,25 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
session.processPendingMessages();
}
}
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ // TODO
+ _sender.flush();
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index d33a4aafd8..f9560fa0d2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.protocol.v0_8;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +33,6 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerMessageInstancePair;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -524,7 +522,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen
{
_channel.getConnection().setDeferFlush(false);
- _channel.getConnection().flushBatched();
+ _channel.getConnection().notifyWork();
}
protected void addUnacknowledgedMessage(MessageInstance entry)
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index a442b5c437..09250ea6ac 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -500,8 +500,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public void flushBatched()
+ public void notifyWork()
{
+ _protocolEngine.notifyWork();
+
+ // TODO
_protocolEngine.flushBatched();
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index d361dce682..856aa14947 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -29,6 +29,7 @@ import java.security.PrivilegedAction;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.auth.Subject;
import javax.security.sasl.SaslException;
@@ -82,6 +83,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _endpoint;
private long _connectionId;
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
@@ -620,4 +622,22 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
session.processPendingMessages();
}
}
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+ }
+
+ @Override
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
}