From 0314cbe225dce796e09ae9abbd450323808fe493 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 18 Aug 2009 16:16:10 +0000 Subject: QPID-2024: Add ProtocolEngine and NetworkDriver interfaces and a NetworkDriver implementation that uses MINA. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@805477 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/protocol/ProtocolEngine.java | 62 ++++ .../qpid/protocol/ProtocolEngineFactory.java | 29 ++ .../org/apache/qpid/thread/QpidThreadExecutor.java | 43 +++ .../org/apache/qpid/transport/NetworkDriver.java | 61 ++++ .../qpid/transport/NetworkDriverConfiguration.java | 44 +++ .../org/apache/qpid/transport/OpenException.java | 32 ++ .../transport/network/mina/MINANetworkDriver.java | 379 +++++++++++++++++++++ 7 files changed, 650 insertions(+) create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java new file mode 100644 index 0000000000..8ab845454a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import java.net.SocketAddress; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Receiver; + +/** + * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received + * decodes it and then process the result. + */ +public interface ProtocolEngine extends Receiver +{ + // Sets the network driver providing data for this ProtocolEngine + void setNetworkDriver (NetworkDriver driver); + + // Returns the remote address of the NetworkDriver + SocketAddress getRemoteAddress(); + + // Returns number of bytes written + long getWrittenBytes(); + + // Returns number of bytes read + long getReadBytes(); + + // Called by the NetworkDriver when the socket has been closed for reading + void closed(); + + // Called when the NetworkEngine has not written data for the specified period of time (will trigger a + // heartbeat) + void writerIdle(); + + // Called when the NetworkEngine has not read data for the specified period of time (will close the connection) + void readerIdle(); + + /** + * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and + * passes the data onto the NetworkDriver for sending + */ + void writeFrame(AMQDataBlock frame); +} \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java new file mode 100644 index 0000000000..d8c0f2c916 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +public interface ProtocolEngineFactory +{ + + // Returns a new instance of a ProtocolEngine + ProtocolEngine newProtocolEngine(); + +} \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java new file mode 100644 index 0000000000..376658bb99 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.thread; + +import org.apache.qpid.thread.Threading; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +public class QpidThreadExecutor implements Executor +{ + @Override + public void execute(Runnable command) + { + try + { + Threading.getThreadFactory().createThread(command).start(); + } + catch(Exception e) + { + throw new RuntimeException("Error creating a thread using Qpid thread factory",e); + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java new file mode 100644 index 0000000000..d45cee8004 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.SocketAddress; + +import javax.net.ssl.SSLEngine; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; + +public interface NetworkDriver extends Sender +{ + // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to + // it using the SSLEngine if provided + void open(int port, InetAddress destination, ProtocolEngine engine, + NetworkDriverConfiguration config, SSLEngine sslEngine) + throws OpenException; + + // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which + // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided + void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; + + // Returns the remote address of underlying socket + SocketAddress getRemoteAddress(); + + /** + * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been + * read in seconds + */ + void setMaxReadIdle(int idleTime); + + /** + * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been + * written in seconds + */ + void setMaxWriteIdle(int idleTime); + +} \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java new file mode 100644 index 0000000000..18cae6bf85 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +/** + * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing + * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned + * from here if the underlying implementation supports them. + */ +public interface NetworkDriverConfiguration +{ + // Taken from Socket + boolean getKeepAlive(); + boolean getOOBInline(); + boolean getReuseAddress(); + Integer getSoLinger(); // null means off + int getSoTimeout(); + boolean getTcpNoDelay(); + int getTrafficClass(); + + // The amount of memory in bytes to allocate to the incoming buffer + int getReceiveBufferSize(); + + // The amount of memory in bytes to allocate to the outgoing buffer + int getSendBufferSize(); +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java new file mode 100644 index 0000000000..8628b8c7aa --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport; + +public class OpenException extends Exception +{ + + public OpenException(String string, Throwable lastException) + { + super(string, lastException); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java new file mode 100644 index 0000000000..96fb7b1ef8 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -0,0 +1,379 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLEngine; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoFilterChain; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.ReadThrottleFilterBuilder; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.WriteBufferLimitFilterBuilder; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.OpenException; + +public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver +{ + + private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + + ProtocolEngine _protocolEngine; + private boolean _useNIO = false; + private int _processors = 4; + private boolean _executorPool = false; + private SSLContextFactory _sslFactory = null; + private SocketConnector _socketConnector; + private IoAcceptor _acceptor; + private IoSession _ioSession; + private ProtocolEngineFactory _factory; + private boolean _protectIO; + private NetworkDriverConfiguration _config; + private Throwable _lastException; + private boolean _acceptingConnections = false; + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + } + + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO, + ProtocolEngine protocolEngine, IoSession session) + { + _useNIO = useNIO; + _processors = processors; + _executorPool = executorPool; + _protectIO = protectIO; + _protocolEngine = protocolEngine; + _ioSession = session; + } + + public MINANetworkDriver() + { + + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + _factory = factory; + _config = config; + + if (_useNIO) + { + _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors, + new NewThreadExecutor()); + } + else + { + _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); + } + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + + if (config != null) + { + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setTcpNoDelay(config.getTcpNoDelay()); + } + + // if we do not use the executor pool threading model we get the default + // leader follower + // implementation provided by MINA + if (_executorPool) + { + sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + + if (addresses != null && addresses.length > 0) + { + for (InetAddress addr : addresses) + { + try + { + _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to {0}:{2}", addr, port)); + } + } + } + else + { + try + { + _acceptor.bind(new InetSocketAddress(port), this, sconfig); + } + catch (IOException e) + { + throw new BindException(String.format("Could not bind to *:{1}", port)); + } + } + _acceptingConnections = true; + } + + public SocketAddress getRemoteAddress() + { + return _ioSession.getRemoteAddress(); + } + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + SSLEngine sslEngine) throws OpenException + { + if (_useNIO) + { + _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); + } + else + { + _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking + // connector + } + + org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (!Boolean.getBoolean("amqj.enablePooledAllocator")) + { + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + + SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); + + // if we do not use our own thread model we get the MINA default which is to use + // its own leader-follower model + boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); + if (readWriteThreading) + { + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); + scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use + // one SocketConnector per connection at the moment anyway). This allows + // short-running + // clients (like unit tests) to complete quickly. + _socketConnector.setWorkerTimeout(0); + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); + future.join(); + if (!future.isConnected()) + { + throw new OpenException("Could not open connection", _lastException); + } + _ioSession = future.getSession(); + ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession); + ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession); + _ioSession.setAttachment(engine); + engine.setNetworkDriver(this); + _protocolEngine = engine; + } + + public void setMaxReadIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); + } + + public void setMaxWriteIdle(int idleTime) + { + _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); + } + + public void close() + { + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + if (_ioSession != null) + { + _ioSession.close(); + } + } + + public void flush() + { + // MINA doesn't support flush + } + + public void send(ByteBuffer msg) + { + WriteFuture future = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg)); + future.join(); + } + + public void setIdleTimeout(long l) + { + // MINA doesn't support setting SO_TIMEOUT + } + + public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception + { + if (_protocolEngine != null) + { + _protocolEngine.exception(throwable); + } + _lastException = throwable; + } + + /** + * Invoked when a message is received on a particular protocol session. Note + * that a protocol session is directly tied to a particular physical + * connection. + * + * @param protocolSession + * the protocol session that received the message + * @param message + * the message itself (i.e. a decoded frame) + * + * @throws Exception + * if the message cannot be processed + */ + public void messageReceived(IoSession protocolSession, Object message) throws Exception + { + if (message instanceof org.apache.mina.common.ByteBuffer) + { + ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); + } + else + { + throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); + } + } + + public void sessionClosed(IoSession protocolSession) throws Exception + { + ((ProtocolEngine) protocolSession.getAttachment()).closed(); + } + + public void sessionCreated(IoSession protocolSession) throws Exception + { + if (_acceptingConnections) + { + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + else + { + if (_sslFactory != null) + { + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + } + + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); + + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); + + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); + + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } + + // Set up the protocol engine + ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); + protocolEngine.setNetworkDriver(newDriver); + protocolSession.setAttachment(protocolEngine); + } + } + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + + private ProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + +} -- cgit v1.2.1 From f0051104b5b99601507c578bd0a7b819a76aef55 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 19 Aug 2009 14:03:25 +0000 Subject: QPID-2024: Change send to stash the future and have flush join on that so that it only returns when all data has been written. Add getLocalAddress. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@805809 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/transport/NetworkDriver.java | 5 ++++- .../qpid/transport/network/mina/MINANetworkDriver.java | 15 ++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java index d45cee8004..34b0ef65be 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java @@ -43,9 +43,12 @@ public interface NetworkDriver extends Sender void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; - // Returns the remote address of underlying socket + // Returns the remote address of the underlying socket SocketAddress getRemoteAddress(); + // Returns the local address of the underlying socket + SocketAddress getLocalAddress(); + /** * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been * read in seconds diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 96fb7b1ef8..7330a042df 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -78,6 +78,8 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private Throwable _lastException; private boolean _acceptingConnections = false; + private WriteFuture _lastWriteFuture; + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) { _useNIO = useNIO; @@ -174,6 +176,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getRemoteAddress(); } + + public SocketAddress getLocalAddress() + { + return _ioSession.getLocalAddress(); + } public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -256,13 +263,15 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void flush() { - // MINA doesn't support flush + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } } public void send(ByteBuffer msg) { - WriteFuture future = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg)); - future.join(); + _lastWriteFuture = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg)); } public void setIdleTimeout(long l) -- cgit v1.2.1 From a7be8fc7337b5cc093f593cc1becb9fe7b4dc0fb Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 1 Sep 2009 16:27:52 +0000 Subject: QPID-2025: Add a AMQProtocolEngine from the de-MINAfied AMQMinaProtocolSession. Remove various now-unused classes and update references. Add tests for AMQDecoder. Net -1500 lines, +25% performance on transient messaging. Nice. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@810110 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/codec/AMQCodecFactory.java | 7 +- .../java/org/apache/qpid/codec/AMQDecoder.java | 79 ++++++++++++++++++++-- .../apache/qpid/framing/AMQDataBlockDecoder.java | 39 +++++++---- .../apache/qpid/framing/AMQDataBlockEncoder.java | 2 +- .../apache/qpid/framing/ProtocolInitiation.java | 20 +++--- .../src/main/java/org/apache/qpid/pool/Event.java | 58 +++++++++------- .../src/main/java/org/apache/qpid/pool/Job.java | 34 ++++------ .../java/org/apache/qpid/pool/PoolingFilter.java | 27 ++++---- .../org/apache/qpid/pool/ReadWriteRunnable.java | 1 - .../qpid/protocol/ProtocolEngineFactory.java | 4 +- .../qpid/transport/NetworkDriverConfiguration.java | 16 ++--- .../transport/network/mina/MINANetworkDriver.java | 20 +++++- 12 files changed, 203 insertions(+), 104 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java index fa890d0ebb..591dbd085b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.codec; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to @@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory * @param expectProtocolInitiation true if the first frame received is going to be a protocol initiation * frame, false if it is going to be a standard AMQ data block. */ - public AMQCodecFactory(boolean expectProtocolInitiation) + public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { - _frameDecoder = new AMQDecoder(expectProtocolInitiation); + _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } /** @@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory * * @return The AMQP decoder. */ - public ProtocolDecoder getDecoder() + public AMQDecoder getDecoder() { return _frameDecoder; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index 7eef73f337..281c0761d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation true if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder } - /** + /** * Cumulates content of in into internal buffer and forwards * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * doDecode() is invoked repeatedly until it returns false @@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList dataBlocks = new ArrayList(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java index 82ffc60802..228867b2b0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java @@ -47,7 +47,7 @@ public class AMQDataBlockDecoder public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ public class AMQDataBlockDecoder return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ public class AMQDataBlockDecoder BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public class AMQDataBlockDecoder public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java index 05fd2bb480..374644b4f2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java @@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index 3ac17e9204..cf8a866e47 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java index 5996cbf89c..49bce9f2f9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java @@ -45,21 +45,31 @@ import org.apache.mina.common.IoSession; * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, * it is really an interface, so could just drop it and use the continuation interface instead. */ -public abstract class Event +public class Event { + private Runnable _runner; + + public Event() + { + + } + /** * Creates a continuation. */ - public Event() - { } + public Event(Runnable runner) + { + _runner = runner; + } /** - * Processes the continuation in the context of a Mina session. - * - * @param session The Mina session. + * Processes the continuation */ - public abstract void process(IoSession session); - + public void process() + { + _runner.run(); + } + /** * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. * @@ -68,22 +78,22 @@ public abstract class Event * Pass a Mina messageReceived event to a NextFilter. {@link IoFilter.NextFilter}, {@link IoSession} * */ - public static final class ReceivedEvent extends Event + public static final class MinaReceivedEvent extends Event { private final Object _data; - private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) + public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.messageReceived(session, _data); + _nextFilter.messageReceived(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -101,21 +111,22 @@ public abstract class Event * {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} * */ - public static final class WriteEvent extends Event + public static final class MinaWriteEvent extends Event { private final IoFilter.WriteRequest _data; private final IoFilter.NextFilter _nextFilter; + private IoSession _session; - public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) + public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.filterWrite(session, _data); + _nextFilter.filterWrite(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -135,16 +146,17 @@ public abstract class Event public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public CloseEvent(final IoFilter.NextFilter nextFilter) + public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) { - super(); _nextFilter = nextFilter; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.sessionClosed(session); + _nextFilter.sessionClosed(_session); } public IoFilter.NextFilter getNextFilter() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 00da005515..4e4192dbe3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -55,9 +55,6 @@ public class Job implements ReadWriteRunnable /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); @@ -79,7 +76,13 @@ public class Job implements ReadWriteRunnable */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { - _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + _readJob = readJob; + } + + public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + { _completionHandler = completionHandler; _maxEvents = maxEvents; _readJob = readJob; @@ -90,7 +93,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Event evt) { _eventQueue.add(evt); } @@ -111,7 +114,7 @@ public class Job implements ReadWriteRunnable } else { - e.process(_session); + e.process(); } } return false; @@ -153,30 +156,19 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + _completionHandler.completed(this); } else { - _completionHandler.notCompleted(_session, this); + _completionHandler.notCompleted(this); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - public boolean isWrite() - { - return !_readJob; - } - - /** * Another interface for a continuation. * @@ -185,8 +177,8 @@ public class Job implements ReadWriteRunnable */ static interface JobCompletionHandler { - public void completed(IoSession session, Job job); + public void completed(Job job); - public void notCompleted(final IoSession session, final Job job); + public void notCompleted(final Job job); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index a080cc7e04..4863611c42 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -20,19 +20,17 @@ */ package org.apache.qpid.pool; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; - +import org.apache.qpid.pool.Event.MinaReceivedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ExecutorService; - /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it * adds no behaviour by default to the filter chain, it is abstract. @@ -74,7 +72,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); private final int _maxEvents; @@ -188,7 +186,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } - + /** * Retrieves this filters Job, by this filters name, from the Mina session. * @@ -208,7 +206,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param session The Mina session to work in. * @param job The job that completed. */ - public void completed(IoSession session, Job job) + public void completed(Job job) { @@ -239,7 +237,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo } } - public void notCompleted(IoSession session, Job job) + public void notCompleted(Job job) { final ExecutorService pool = _poolReference.getPool(); @@ -430,7 +428,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); + fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); } /** @@ -442,7 +440,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } @@ -473,7 +471,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); + fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); } /** @@ -485,7 +483,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java index ad04a923e1..140c93ca8d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java @@ -23,5 +23,4 @@ package org.apache.qpid.pool; public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index d8c0f2c916..9df84eef90 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.transport.NetworkDriver; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); } \ No newline at end of file diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java index 18cae6bf85..c38afe5dd5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java @@ -28,17 +28,17 @@ package org.apache.qpid.transport; public interface NetworkDriverConfiguration { // Taken from Socket - boolean getKeepAlive(); - boolean getOOBInline(); - boolean getReuseAddress(); + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); Integer getSoLinger(); // null means off - int getSoTimeout(); - boolean getTcpNoDelay(); - int getTrafficClass(); + Integer getSoTimeout(); + Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer - int getReceiveBufferSize(); + Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - int getSendBufferSize(); + Integer getSendBufferSize(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7330a042df..477e2cd5af 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -181,6 +181,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { return _ioSession.getLocalAddress(); } + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -251,6 +252,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void close() { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } if (_acceptor != null) { _acceptor.unbindAll(); @@ -359,9 +364,14 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); } - + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); protocolSession.setAttachment(protocolEngine); @@ -385,4 +395,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver return _protocolEngine; } + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + } -- cgit v1.2.1 From 7b28732091473d93ce7546c70fa1d2dbd685161a Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 4 Sep 2009 09:40:32 +0000 Subject: QPID-2025: Log errors instead of printStackTracing() git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@811326 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/transport/network/mina/MINANetworkDriver.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 477e2cd5af..8df3644929 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -30,13 +30,13 @@ import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; +import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoSessionConfig; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.common.WriteFuture; import org.apache.mina.filter.ReadThrottleFilterBuilder; @@ -80,6 +80,8 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private WriteFuture _lastWriteFuture; + private static final Logger _logger = Logger.getLogger(MINANetworkDriver.class); + public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) { _useNIO = useNIO; @@ -289,6 +291,10 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver if (_protocolEngine != null) { _protocolEngine.exception(throwable); + } + else + { + _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); } _lastException = throwable; } -- cgit v1.2.1 From c1ebe66bfab328c5192a35c21ea290b5c45f40f5 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 9 Sep 2009 13:05:43 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@812936 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/transport/network/mina/MINANetworkDriver.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 8df3644929..e34103a944 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -30,7 +30,6 @@ import java.nio.ByteBuffer; import javax.net.ssl.SSLEngine; -import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; @@ -59,6 +58,9 @@ import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { @@ -80,7 +82,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private WriteFuture _lastWriteFuture; - private static final Logger _logger = Logger.getLogger(MINANetworkDriver.class); + private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO) { -- cgit v1.2.1 From 9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 10:06:55 +0000 Subject: QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine. Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/pool/Job.java | 40 ++++++++ .../org/apache/qpid/protocol/ProtocolEngine.java | 3 + .../org/apache/qpid/transport/NetworkDriver.java | 9 +- .../org/apache/qpid/transport/OpenException.java | 4 +- .../transport/network/mina/MINANetworkDriver.java | 108 +++++++++++++-------- 5 files changed, 118 insertions(+), 46 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 4e4192dbe3..15d1c20ff1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -21,9 +21,13 @@ package org.apache.qpid.pool; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.mina.common.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation. @@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable private final boolean _readJob; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); + /** * Creates a new job that aggregates many continuations together. * @@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable public void notCompleted(final Job job); } + + /** + * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + { + + job.add(event); + + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 8ab845454a..5bfc189b02 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); + + // Returns the local address of the NetworkDriver + SocketAddress getLocalAddress(); // Returns number of bytes written long getWrittenBytes(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java index 34b0ef65be..86af97bf7e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java @@ -24,8 +24,6 @@ import java.net.BindException; import java.net.InetAddress; import java.net.SocketAddress; -import javax.net.ssl.SSLEngine; - import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory; public interface NetworkDriver extends Sender { // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to - // it using the SSLEngine if provided + // it using the SSLContextFactory if provided void open(int port, InetAddress destination, ProtocolEngine engine, - NetworkDriverConfiguration config, SSLEngine sslEngine) + NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws OpenException; // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which - // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided + // processes incoming connections with ProtocolEngines and SSLEngines created from the factories + // (in the case of an SSLContextFactory, if provided) void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java index 8628b8c7aa..68fbb5e8ec 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java @@ -21,7 +21,9 @@ package org.apache.qpid.transport; -public class OpenException extends Exception +import java.io.IOException; + +public class OpenException extends IOException { public OpenException(String string, Throwable lastException) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index e34103a944..7cc5f8e442 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; +import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; @@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver private int _processors = 4; private boolean _executorPool = false; private SSLContextFactory _sslFactory = null; - private SocketConnector _socketConnector; + private IoConnector _socketConnector; private IoAcceptor _acceptor; private IoSession _ioSession; private ProtocolEngineFactory _factory; @@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _protectIO = protectIO; _protocolEngine = protocolEngine; _ioSession = session; + _ioSession.setAttachment(_protocolEngine); } public MINANetworkDriver() @@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } + public MINANetworkDriver(IoConnector ioConnector) + { + _socketConnector = ioConnector; + } + + public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) + { + _socketConnector = ioConnector; + _protocolEngine = engine; + } + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException { @@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLEngine sslEngine) throws OpenException + SSLContextFactory sslFactory) throws OpenException { + if (sslFactory != null) + { + _sslFactory = sslFactory; + } + if (_useNIO) { _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor()); @@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver { org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); } - SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); @@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver // one SocketConnector per connection at the moment anyway). This allows // short-running // clients (like unit tests) to complete quickly. - _socketConnector.setWorkerTimeout(0); + if (_socketConnector instanceof SocketConnector) + { + ((SocketConnector) _socketConnector).setWorkerTimeout(0); + } + ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); future.join(); if (!future.isConnected()) @@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver public void sessionCreated(IoSession protocolSession) throws Exception { - if (_acceptingConnections) + // Configure the session with SSL if necessary + SessionUtil.initialize(protocolSession); + if (_executorPool) { - // Configure the session with SSL if necessary - SessionUtil.initialize(protocolSession); - if (_executorPool) + if (_sslFactory != null) { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } + protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); } - else + } + else + { + if (_sslFactory != null) { - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } + protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); } + } + // Do we want to have read/write buffer limits? + if (_protectIO) + { + //Add IO Protection Filters + IoFilterChain chain = protocolSession.getFilterChain(); - // Do we want to have read/write buffer limits? - if (_protectIO) - { - //Add IO Protection Filters - IoFilterChain chain = protocolSession.getFilterChain(); + protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); - protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); + ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); + readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); + readfilter.attach(chain); - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize()); - readfilter.attach(chain); + WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); + writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); + writefilter.attach(chain); - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize()); - writefilter.attach(chain); + protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); + } - protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); - } - - if (_ioSession == null) - { - _ioSession = protocolSession; - } - + if (_ioSession == null) + { + _ioSession = protocolSession; + } + + if (_acceptingConnections) + { // Set up the protocol engine ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); - protocolSession.setAttachment(protocolEngine); } } @@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver _acceptingConnections = acceptingConnections; } + public void setProtocolEngine(ProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + if (_ioSession != null) + { + _ioSession.setAttachment(protocolEngine); + } + } + } -- cgit v1.2.1 From 93fa7d17feecb3d27cead67e11b250af1fcc595e Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 16 Sep 2009 11:32:28 +0000 Subject: QPID-2015: Remove AMQIoTransportProtocolSession. Release the executor service in the same class as it's acquired git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815729 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java | 3 --- 1 file changed, 3 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index 4863611c42..4e02ac3a55 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -136,9 +136,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo public void destroy() { _logger.debug("Destroy called on PoolingFilter " + toString()); - - // When the reference count gets to zero we release the executor service. - _poolReference.releaseExecutorService(); } /** -- cgit v1.2.1 From 31bbc100ac6b3a31eb25d29f407d60ff23334d1f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 17 Sep 2009 15:19:54 +0000 Subject: QPID-2024 QPID-2105: Remove now unnecessary classes like Event, PoolingFilter, ReadWriteThreadModel. Move the couple of necessary methods to Job. Fix imports. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816232 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/pool/Event.java | 167 ------- .../src/main/java/org/apache/qpid/pool/Job.java | 111 +++-- .../java/org/apache/qpid/pool/PoolingFilter.java | 487 --------------------- .../org/apache/qpid/pool/ReadWriteThreadModel.java | 102 ----- .../transport/network/mina/MINANetworkDriver.java | 22 - 5 files changed, 70 insertions(+), 819 deletions(-) delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java delete mode 100644 qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java deleted file mode 100644 index 49bce9f2f9..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilter; -import org.apache.mina.common.IoSession; - -/** - * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain - * for later processing. It is an abstract class, with different implementations for continuations of different kinds - * of Mina events. - * - *

These continuations are typically batched by {@link Job} for processing by a worker thread pool. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Process a continuation in the context of a Mina session. - *
- * - * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static - * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of - * the context of the outer Event class. The inner class construction used here is preventing common code re-use - * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like - * a justifiable use of inner classes. Move the inner classes out into their own files. - * - * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as - * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, - * it is really an interface, so could just drop it and use the continuation interface instead. - */ -public class Event -{ - private Runnable _runner; - - public Event() - { - - } - - /** - * Creates a continuation. - */ - public Event(Runnable runner) - { - _runner = runner; - } - - /** - * Processes the continuation - */ - public void process() - { - _runner.run(); - } - - /** - * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Pass a Mina messageReceived event to a NextFilter. {@link IoFilter.NextFilter}, {@link IoSession} - *
- */ - public static final class MinaReceivedEvent extends Event - { - private final Object _data; - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.messageReceived(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Pass a Mina filterWrite event to a NextFilter. - * {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession} - *
- */ - public static final class MinaWriteEvent extends Event - { - private final IoFilter.WriteRequest _data; - private final IoFilter.NextFilter _nextFilter; - private IoSession _session; - - public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) - { - _nextFilter = nextFilter; - _data = data; - _session = session; - } - - public void process() - { - _nextFilter.filterWrite(_session, _data); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } - - /** - * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Pass a Mina sessionClosed event to a NextFilter. {@link IoFilter.NextFilter}, {@link IoSession} - *
- */ - public static final class CloseEvent extends Event - { - private final IoFilter.NextFilter _nextFilter; - private final IoSession _session; - - public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) - { - _nextFilter = nextFilter; - _session = session; - } - - public void process() - { - _nextFilter.sessionClosed(_session); - } - - public IoFilter.NextFilter getNextFilter() - { - return _nextFilter; - } - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index 15d1c20ff1..82b600de88 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,40 +55,28 @@ import org.slf4j.LoggerFactory; */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; /** Holds the queue of events that make up the job. */ - private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); + private final java.util.Queue _eventQueue = new ConcurrentLinkedQueue(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; + private ReferenceCountingExecutorService _poolReference; + private final static Logger _logger = LoggerFactory.getLogger(Job.class); - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) - { - _completionHandler = completionHandler; - _maxEvents = maxEvents; - _readJob = readJob; - } - - public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -99,7 +86,7 @@ public class Job implements ReadWriteRunnable * * @param evt The continuation to enqueue. */ - public void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -113,14 +100,14 @@ public class Job implements ReadWriteRunnable int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(); + e.run(); } } return false; @@ -162,11 +149,11 @@ public class Job implements ReadWriteRunnable if(processAll()) { deactivate(); - _completionHandler.completed(this); + completed(); } else { - _completionHandler.notCompleted(this); + notCompleted(); } } @@ -174,19 +161,6 @@ public class Job implements ReadWriteRunnable { return _readJob; } - - /** - * Another interface for a continuation. - * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. - */ - static interface JobCompletionHandler - { - public void completed(Job job); - - public void notCompleted(final Job job); - } /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. @@ -194,7 +168,7 @@ public class Job implements ReadWriteRunnable * @param job The job. * @param event The event to hand off asynchronously. */ - public static void fireAsynchEvent(ExecutorService pool, Job job, Event event) + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { job.add(event); @@ -221,4 +195,59 @@ public class Job implements ReadWriteRunnable } + /** + * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. + * + * @param session The Mina session to work in. + * @param job The job that completed. + */ + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java deleted file mode 100644 index 4e02ac3a55..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; - -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoSession; -import org.apache.qpid.pool.Event.CloseEvent; -import org.apache.qpid.pool.Event.MinaReceivedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it - * adds no behaviour by default to the filter chain, it is abstract. - * - *

PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by - * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is - * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as - * a buffer between stages of the pipeline. - * - *

There are two convenience methods, {@link #createAynschReadPoolingFilter} and - * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and - * 'filterWrite' events, making it possible to process these event streams seperately. - * - *

Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the - * Mina session they are working with, and store it in the session against their identifying name. This allows different - * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their - * workloads in different jobs. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Implement default, pass through filter. - *
Create pooling filters and a specific thread pool. {@link ReferenceCountingExecutorService} - *
Provide the ability to batch Mina events for asynchronous processing. {@link Job}, {@link Event} - *
Provide a terminal continuation to keep jobs running till empty. - * {@link Job}, {@link Job.JobCompletionHandler} - *
- * - * @todo The static helper methods are pointless. Could just call new. - */ -public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler -{ - /** Used for debugging purposes. */ - private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - - /** Holds the managed reference to obtain the executor for the batched jobs. */ - private final ReferenceCountingExecutorService _poolReference; - - /** Used to hold a name for identifying differeny pooling filter types. */ - private final String _name; - - /** Defines the maximum number of events that will be batched into a single job. */ - public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); - - private final int _maxEvents; - - private final boolean _readFilter; - - /** - * Creates a named pooling filter, on the specified shared thread pool. - * - * @param refCountingPool The thread pool reference. - * @param name The identifying name of the filter type. - */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter) - { - _poolReference = refCountingPool; - _name = name; - _maxEvents = maxEvents; - _readFilter = readFilter; - } - - /** - * Helper method to get an instance of a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous read events. - */ - public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchReadPoolingFilter(refCountingPool, name); - } - - /** - * Helper method to get an instance of a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - * - * @return A pooling filter for asynchronous write events. - */ - public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - return new AsynchWritePoolingFilter(refCountingPool, name); - } - - /** - * Called by Mina to initialize this filter. Takes a reference to the thread pool. - */ - public void init() - { - _logger.debug("Init called on PoolingFilter " + toString()); - - // Called when the filter is initialised in the chain. If the reference count is - // zero this acquire will initialise the pool. - _poolReference.acquireExecutorService(); - } - - /** - * Called by Mina to clean up this filter. Releases the reference to the thread pool. - */ - public void destroy() - { - _logger.debug("Destroy called on PoolingFilter " + toString()); - } - - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - void fireAsynchEvent(Job job, Event event) - { - - job.add(event); - - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } - - /** - * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously - * handled events. - * - * @param session The Mina session. - */ - public void createNewJobForSession(IoSession session) - { - Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); - session.setAttribute(_name, job); - } - - /** - * Retrieves this filters Job, by this filters name, from the Mina session. - * - * @param session The Mina session. - * - * @return The Job for this filter to place asynchronous events into. - */ - public Job getJobForSession(IoSession session) - { - return (Job) session.getAttribute(_name); - } - - /** - * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing - * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. - * - * @param session The Mina session to work in. - * @param job The job that completed. - */ - public void completed(Job job) - { - - - if (!job.isComplete()) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - - // ritchiem : 2006-12-13 Do we need to perform the additional checks here? - // Can the pool be shutdown at this point? - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - } - } - - public void notCompleted(Job job) - { - final ExecutorService pool = _poolReference.getPool(); - - if(pool == null) - { - return; - } - - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - - } - - - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionOpened(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception - { - nextFilter.sessionClosed(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param status The session idle status. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception - { - nextFilter.sessionIdle(session, status); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param cause The underlying exception. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception - { - nextFilter.exceptionCaught(session, cause); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageReceived(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message sent. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception - { - nextFilter.messageSent(session, message); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - throws Exception - { - nextFilter.filterWrite(session, writeRequest); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void filterClose(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.filterClose(session); - } - - /** - * No-op pass through filter to the next filter in the chain. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * - * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow - * overriding sub-classes the ability to. - */ - public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception - { - nextFilter.sessionCreated(session); - } - - /** - * Prints the filter types identifying name to a string, mainly for debugging purposes. - * - * @return The filter types identifying name. - */ - public String toString() - { - return _name; - } - - /** - * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchReadPoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles read events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param message The message received. - */ - public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - - /** - * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events - * asynchronously. - */ - public static class AsynchWritePoolingFilter extends PoolingFilter - { - /** - * Creates a pooling filter that handles write events asynchronously. - * - * @param refCountingPool A managed reference to the thread pool. - * @param name The filter types identifying name. - */ - public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) - { - super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - * @param writeRequest The write request event. - */ - public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); - } - - /** - * Hands off this event for asynchronous execution. - * - * @param nextFilter The next filter in the chain. - * @param session The Mina session. - */ - public void sessionClosed(final NextFilter nextFilter, final IoSession session) - { - Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter, session)); - } - } - -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java deleted file mode 100644 index 8cea70e597..0000000000 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.pool; - -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.filter.ReferenceCountingIoFilter; - -/** - * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to - * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle - * these events. The effect of this is that reading and writing may happen concurrently. - * - *

Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Create a filter chain with seperate read and write thread pools for read/write Mina events. - * {@link PoolingFilter} - *
- */ -public class ReadWriteThreadModel implements ThreadModel -{ - /** Holds the singleton instance of this factory. */ - private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); - - /** Holds the thread pooling filter for reads. */ - private final PoolingFilter _asynchronousReadFilter; - - /** Holds the thread pooloing filter for writes. */ - private final PoolingFilter _asynchronousWriteFilter; - - /** - * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that - * only a singleton instance of the factory is ever created. - */ - private ReadWriteThreadModel() - { - final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); - _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); - } - - /** - * Gets the singleton instance of this filter chain factory. - * - * @return The singleton instance of this filter chain factory. - */ - public static ReadWriteThreadModel getInstance() - { - return _instance; - } - - /** - * Gets the read filter. - * - * @return The read filter. - */ - public PoolingFilter getAsynchronousReadFilter() - { - return _asynchronousReadFilter; - } - - /** - * Gets the write filter. - * - * @return The write filter. - */ - public PoolingFilter getAsynchronousWriteFilter() - { - return _asynchronousWriteFilter; - } - - /** - * Adds the concurrent read and write filters to a filter chain. - * - * @param chain The Mina filter chain to add to. - */ - public void buildFilterChain(IoFilterChain chain) - { - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 7cc5f8e442..38ea9307b7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -28,8 +28,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import javax.net.ssl.SSLEngine; - import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; @@ -50,7 +48,6 @@ import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.NewThreadExecutor; import org.apache.mina.util.SessionUtil; -import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; @@ -58,7 +55,6 @@ import org.apache.qpid.thread.QpidThreadExecutor; import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.NetworkDriverConfiguration; import org.apache.qpid.transport.OpenException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,14 +144,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver sc.setTcpNoDelay(config.getTcpNoDelay()); } - // if we do not use the executor pool threading model we get the default - // leader follower - // implementation provided by MINA - if (_executorPool) - { - sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); - } - if (sslFactory != null) { _sslFactory = sslFactory; @@ -227,14 +215,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); @@ -258,8 +238,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver throw new OpenException("Could not open connection", _lastException); } _ioSession = future.getSession(); - ReadWriteThreadModel.getInstance().getAsynchronousReadFilter().createNewJobForSession(_ioSession); - ReadWriteThreadModel.getInstance().getAsynchronousWriteFilter().createNewJobForSession(_ioSession); _ioSession.setAttachment(engine); engine.setNetworkDriver(this); _protocolEngine = engine; -- cgit v1.2.1 From 788f96fd8af146cba5bff57300b1a513988c34b9 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 18 Sep 2009 12:55:40 +0000 Subject: Fix bind error message git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@816614 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/transport/network/mina/MINANetworkDriver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java index 38ea9307b7..b0d1c46572 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java @@ -159,7 +159,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } catch (IOException e) { - throw new BindException(String.format("Could not bind to {0}:{2}", addr, port)); + throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); } } } @@ -171,7 +171,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver } catch (IOException e) { - throw new BindException(String.format("Could not bind to *:{1}", port)); + throw new BindException(String.format("Could not bind to *:%1s", port)); } } _acceptingConnections = true; -- cgit v1.2.1 From 98cc985dbd81a84cd0b0a969c57cb941680ec81f Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Sun, 11 Oct 2009 23:22:08 +0000 Subject: Merge from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@824198 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java | 1 - 1 file changed, 1 deletion(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java b/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java index 3c1ea22595..7d8a5b7b36 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ConsoleOutput.java @@ -51,7 +51,6 @@ public class ConsoleOutput implements Sender System.out.println("CLOSED"); } - @Override public void setIdleTimeout(long l) { // TODO Auto-generated method stub -- cgit v1.2.1 From e249f157f5963cfc458eca1988fb970f086ced72 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 30 Dec 2014 16:48:57 +0000 Subject: QPID-6293: [Java Broker] Log Java Broker's pid on startup * Log the process identifer on startup as an operational log message * Wired up the Broker attribute Broker#processPid git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1648545 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/properties/ConnectionStartProperties.java | 27 +++--------- .../java/org/apache/qpid/util/SystemUtils.java | 48 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 22 deletions(-) (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index 4f88fe7071..3569b4b460 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -20,9 +20,6 @@ */ package org.apache.qpid.properties; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; - import org.apache.qpid.transport.util.Logger; import org.apache.qpid.util.SystemUtils; @@ -62,30 +59,18 @@ public class ConnectionStartProperties public static final String QPID_CONFIRMED_PUBLISH_SUPPORTED = "qpid.confirmed_publish_supported"; - public static int _pid; + public static final int _pid; public static final String _platformInfo; static { - RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); - String processName = rtb.getName(); - if (processName != null && processName.indexOf('@') > 0) - { - try - { - _pid = Integer.parseInt(processName.substring(0,processName.indexOf('@'))); - } - catch(Exception e) - { - LOGGER.warn("Unable to get the PID due to error",e); - _pid = -1; - } - } - else + + _pid = SystemUtils.getProcessPidAsInt(); + + if (_pid == -1) { - LOGGER.warn("Unable to get the PID due to unsupported format : " + processName); - _pid = -1; + LOGGER.warn("Unable to get the process's PID"); } StringBuilder fullSystemInfo = new StringBuilder(System.getProperty("java.runtime.name")); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/SystemUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/SystemUtils.java index 55c7ae9b96..5825276760 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/SystemUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/SystemUtils.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.util; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; + /** * SystemUtils provides some simple helper methods for working with the current * Operating System. @@ -38,9 +41,29 @@ public class SystemUtils private static final String _osName = System.getProperty("os.name", UNKNOWN_OS); private static final String _osVersion = System.getProperty("os.version", UNKNOWN_VERSION); private static final String _osArch = System.getProperty("os.arch", UNKNOWN_ARCH); - private static final boolean _isWindows = _osName.toLowerCase().contains("windows"); + /** Process identifier of underlying process or null if it cannot be determined */ + private static final String _osPid; + private static int _osPidInt; + + static + { + RuntimeMXBean rtb = ManagementFactory.getRuntimeMXBean(); + String processName = rtb.getName(); + int atIndex; + if(processName != null && (atIndex = processName.indexOf('@')) > 0) + { + _osPid = processName.substring(0, atIndex); + _osPidInt = parseInt(_osPid, -1); + } + else + { + _osPid = null; + } + } + + private SystemUtils() { } @@ -60,6 +83,16 @@ public class SystemUtils return _osArch; } + public final static String getProcessPid() + { + return _osPid; + } + + public final static int getProcessPidAsInt() + { + return _osPidInt; + } + public final static boolean isWindows() { return _isWindows; @@ -78,4 +111,17 @@ public class SystemUtils { return _osName + " " + _osVersion + " " + _osArch; } + + private static int parseInt(String str, int defaultVal) + { + try + { + return Integer.parseInt(str); + } + catch(NumberFormatException e) + { + return defaultVal; + } + } + } -- cgit v1.2.1 From 9b1d37a0cbef71478b58c6acee4f72a2474a9f7d Mon Sep 17 00:00:00 2001 From: Andrew MacBean Date: Wed, 14 Jan 2015 10:38:04 +0000 Subject: QPID-6304: [Java Broker] Allow truststore and keystore (JKS) files to be stored as a data:// URL inside the config * Added truststore/keystore unit tests too to cover both new and (most of) the existing functionality, retiring the equivilent slower REST system tests. * Added single REST test exercising the creation of a keystore/teststore from data:// URL. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1651615 13f79535-47bb-0310-9956-ffa450edef68 --- .../security/ssl/QpidClientX509KeyManager.java | 11 ++++ .../transport/network/security/ssl/SSLUtil.java | 18 ++++++ .../java/org/apache/qpid/util/DataUrlUtils.java | 32 +++++++++++ .../main/java/org/apache/qpid/util/FileUtils.java | 64 +++++++++------------- 4 files changed, 87 insertions(+), 38 deletions(-) create mode 100644 qpid/java/common/src/main/java/org/apache/qpid/util/DataUrlUtils.java (limited to 'qpid/java/common/src/main') diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java index 0dccf37979..c61684e2bb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/QpidClientX509KeyManager.java @@ -27,6 +27,7 @@ import javax.net.ssl.SSLEngine; import javax.net.ssl.X509ExtendedKeyManager; import java.io.IOException; import java.net.Socket; +import java.net.URL; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.Principal; @@ -50,6 +51,16 @@ public class QpidClientX509KeyManager extends X509ExtendedKeyManager this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; } + public QpidClientX509KeyManager(String alias, URL keyStoreUrl, String keyStoreType, + String keyStorePassword, String keyManagerFactoryAlgorithmName) throws GeneralSecurityException, IOException + { + this.alias = alias; + KeyStore ks = SSLUtil.getInitializedKeyStore(keyStoreUrl,keyStorePassword,keyStoreType); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithmName); + kmf.init(ks, keyStorePassword.toCharArray()); + this.delegate = (X509ExtendedKeyManager)kmf.getKeyManagers()[0]; + } + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { log.debug("chooseClientAlias:Returning alias " + alias); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java index 98229fd2a1..b6ae2ab4a3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLUtil.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.Principal; @@ -248,6 +249,23 @@ public class SSLUtil return ks; } + public static KeyStore getInitializedKeyStore(URL storePath, String storePassword, String keyStoreType) throws GeneralSecurityException, IOException + { + KeyStore ks = KeyStore.getInstance(keyStoreType); + try(InputStream in = storePath.openStream()) + { + if (in == null && !"PKCS11".equalsIgnoreCase(keyStoreType)) // PKCS11 will not require an explicit path + { + throw new IOException("Unable to load keystore resource: " + storePath); + } + + char[] storeCharPassword = storePassword == null ? null : storePassword.toCharArray(); + + ks.load(in, storeCharPassword); + } + return ks; + } + public static void removeSSLv3Support(final SSLEngine engine) { List enabledProtocols = Arrays.asList(engine.getEnabledProtocols()); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/DataUrlUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/DataUrlUtils.java new file mode 100644 index 0000000000..16c5012d88 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/DataUrlUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.util; + +import javax.xml.bind.DatatypeConverter; + +public class DataUrlUtils +{ + public static String getDataUrlForBytes(final byte[] bytes) + { + StringBuilder inlineURL = new StringBuilder("data:;base64,"); + inlineURL.append(DatatypeConverter.printBase64Binary(bytes)); + return inlineURL.toString(); + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index dd347b54eb..70607f49db 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -22,6 +22,7 @@ package org.apache.qpid.util; import java.io.BufferedInputStream; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -51,39 +52,32 @@ public class FileUtils * * @return The contents of the file. */ - public static String readFileAsString(String filename) + public static byte[] readFileAsBytes(String filename) { - BufferedInputStream is = null; - try + try(BufferedInputStream is = new BufferedInputStream(new FileInputStream(filename))) { - try - { - is = new BufferedInputStream(new FileInputStream(filename)); - } - catch (FileNotFoundException e) - { - throw new RuntimeException(e); - } - return readStreamAsString(is); } - finally + catch (IOException e) { - if (is != null) - { - try - { - is.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } + throw new RuntimeException(e); } } + + /** + * Reads a text file as a string. + * + * @param filename The name of the file. + * + * @return The contents of the file. + */ + public static String readFileAsString(String filename) + { + return new String(readFileAsBytes(filename)); + } + /** * Reads a text file as a string. * @@ -93,18 +87,15 @@ public class FileUtils */ public static String readFileAsString(File file) { - BufferedInputStream is = null; - - try + try(BufferedInputStream is = new BufferedInputStream(new FileInputStream(file))) { - is = new BufferedInputStream(new FileInputStream(file)); + + return new String(readStreamAsString(is)); } - catch (FileNotFoundException e) + catch (IOException e) { throw new RuntimeException(e); } - - return readStreamAsString(is); } /** @@ -115,23 +106,20 @@ public class FileUtils * * @return The contents of the reader. */ - private static String readStreamAsString(BufferedInputStream is) + private static byte[] readStreamAsString(BufferedInputStream is) { - try + try(ByteArrayOutputStream inBuffer = new ByteArrayOutputStream()) { byte[] data = new byte[4096]; - StringBuffer inBuffer = new StringBuffer(); - int read; while ((read = is.read(data)) != -1) { - String s = new String(data, 0, read); - inBuffer.append(s); + inBuffer.write(data, 0, read); } - return inBuffer.toString(); + return inBuffer.toByteArray(); } catch (IOException e) { -- cgit v1.2.1