summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
committerAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
commit9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch)
tree3834f1b7f1fe3fbdd632a9c78f6295e54595abc5 /qpid/java/common
parentc1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff)
downloadqpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java108
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java122
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java13
7 files changed, 253 insertions, 46 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 4e4192dbe3..15d1c20ff1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,13 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable
private final boolean _readJob;
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
/**
* Creates a new job that aggregates many continuations together.
*
@@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable
public void notCompleted(final Job job);
}
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ {
+
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 8ab845454a..5bfc189b02 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
+
+ // Returns the local address of the NetworkDriver
+ SocketAddress getLocalAddress();
// Returns number of bytes written
long getWrittenBytes();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
index 34b0ef65be..86af97bf7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -24,8 +24,6 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.SocketAddress;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory;
public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
{
// Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLEngine if provided
+ // it using the SSLContextFactory if provided
void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLEngine sslEngine)
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws OpenException;
// listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
index 8628b8c7aa..68fbb5e8ec 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -21,7 +21,9 @@
package org.apache.qpid.transport;
-public class OpenException extends Exception
+import java.io.IOException;
+
+public class OpenException extends IOException
{
public OpenException(String string, Throwable lastException)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index e34103a944..7cc5f8e442 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
@@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private int _processors = 4;
private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
- private SocketConnector _socketConnector;
+ private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
@@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
}
public MINANetworkDriver()
@@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
{
@@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
if (_useNIO)
{
_socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
-
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
@@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
// one SocketConnector per connection at the moment anyway). This allows
// short-running
// clients (like unit tests) to complete quickly.
- _socketConnector.setWorkerTimeout(0);
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
if (!future.isConnected())
@@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void sessionCreated(IoSession protocolSession) throws Exception
{
- if (_acceptingConnections)
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
{
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
- else
+ }
+ else
+ {
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
- protocolSession.setAttachment(protocolEngine);
}
}
@@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_acceptingConnections = acceptingConnections;
}
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
new file mode 100644
index 0000000000..a4c4b59cdd
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestNetworkDriver implements NetworkDriver
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+ private String _remoteAddress = "127.0.0.1";
+ private String _localAddress = "127.0.0.1";
+ private int _port = 1;
+
+ public TestNetworkDriver()
+ {
+ }
+
+ public void setRemoteAddress(String string)
+ {
+ this._remoteAddress = string;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return new InetSocketAddress(_localAddress, _port);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(_remoteAddress, _port);
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLContextFactory sslFactory) throws OpenException
+ {
+
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void setIdleTimeout(long l)
+ {
+
+ }
+
+ public void setLocalAddress(String localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
index 6024875cf5..5500ff9d4b 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -382,6 +382,18 @@ public class MINANetworkDriverTest extends TestCase
return null;
}
}
+
+ public SocketAddress getLocalAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getLocalAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
public long getWrittenBytes()
{
@@ -459,6 +471,7 @@ public class MINANetworkDriverTest extends TestCase
{
return _closed;
}
+
}
private class EchoProtocolEngine extends CountingProtocolEngine