diff options
Diffstat (limited to 'java/common/src/test')
21 files changed, 1191 insertions, 1425 deletions
diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java deleted file mode 100644 index b93dc46741..0000000000 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java +++ /dev/null @@ -1,396 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.SocketIOTest; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; - -public class IOWriterClient implements Runnable -{ - private static final Logger _logger = LoggerFactory.getLogger(IOWriterClient.class); - - public static int DEFAULT_TEST_SIZE = 2; - - private IoSession _session; - - private long _startTime; - - private long[] _chunkTimes; - - public int _chunkCount = 200000; - - private int _chunkSize = 1024; - - private CountDownLatch _notifier; - - private int _maximumWriteQueueLength; - - static public int _PORT = IOWriterServer._PORT; - - public void run() - { - _logger.info("Starting to send " + _chunkCount + " buffers of " + _chunkSize + "B"); - _startTime = System.currentTimeMillis(); - _notifier = new CountDownLatch(1); - - for (int i = 0; i < _chunkCount; i++) - { - ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); - byte check = (byte) (i % 128); - buf.put(check); - buf.fill((byte) 88, buf.remaining()); - buf.flip(); - - _session.write(buf); - } - - long _sentall = System.currentTimeMillis(); - long _receivedall = _sentall; - try - { - _logger.info("All buffers sent; waiting for receipt from server"); - _notifier.await(); - _receivedall = System.currentTimeMillis(); - } - catch (InterruptedException e) - { - //Ignore - } - _logger.info("Completed"); - _logger.info("Total time waiting for server after last write: " + (_receivedall - _sentall)); - - long totalTime = System.currentTimeMillis() - _startTime; - - _logger.info("Total time: " + totalTime); - _logger.info("MB per second: " + (int) ((1.0 * _chunkSize * _chunkCount) / totalTime)); - long lastChunkTime = _startTime; - double average = 0; - for (int i = 0; i < _chunkTimes.length; i++) - { - if (i == 0) - { - average = _chunkTimes[i] - _startTime; - } - else - { - long delta = _chunkTimes[i] - lastChunkTime; - if (delta != 0) - { - average = (average + delta) / 2; - } - } - lastChunkTime = _chunkTimes[i]; - } - _logger.info("Average chunk time: " + average + "ms"); - _logger.info("Maximum WriteRequestQueue size: " + _maximumWriteQueueLength); - - CloseFuture cf = _session.close(); - _logger.info("Closing session"); - cf.join(); - } - - private class WriterHandler extends IoHandlerAdapter - { - private int _chunksReceived = 0; - - private int _partialBytesRead = 0; - - private byte _partialCheckNumber; - - private int _totalBytesReceived = 0; - - private int _receivedCount = 0; - private int _sentCount = 0; - private static final String DEFAULT_READ_BUFFER = "262144"; - private static final String DEFAULT_WRITE_BUFFER = "262144"; - - public void sessionCreated(IoSession session) throws Exception - { - IoFilterChain chain = session.getFilterChain(); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); - - writefilter.attach(chain); - } - - public void messageSent(IoSession session, Object message) throws Exception - { - _maximumWriteQueueLength = Math.max(session.getScheduledWriteRequests(), _maximumWriteQueueLength); - - if (_logger.isDebugEnabled()) - { - ++_sentCount; - if (_sentCount % 1000 == 0) - { - _logger.debug("Sent count " + _sentCount + ":WQueue" + session.getScheduledWriteRequests()); - - } - } - } - - public void messageReceived(IoSession session, Object message) throws Exception - { - if (_logger.isDebugEnabled()) - { - ++_receivedCount; - - if (_receivedCount % 1000 == 0) - { - _logger.debug("Receieved count " + _receivedCount); - } - } - - ByteBuffer result = (ByteBuffer) message; - _totalBytesReceived += result.remaining(); - int size = result.remaining(); - long now = System.currentTimeMillis(); - if (_partialBytesRead > 0) - { - int offset = _chunkSize - _partialBytesRead; - if (size >= offset) - { - _chunkTimes[_chunksReceived++] = now; - result.position(offset); - } - else - { - // have not read even one chunk, including the previous partial bytes - _partialBytesRead += size; - return; - } - } - - - int chunkCount = result.remaining() / _chunkSize; - - for (int i = 0; i < chunkCount; i++) - { - _chunkTimes[_chunksReceived++] = now; - byte check = result.get(); - _logger.debug("Check number " + check + " read"); - if (check != (byte) ((_chunksReceived - 1) % 128)) - { - _logger.error("Check number " + check + " read when expected " + (_chunksReceived % 128)); - } - _logger.debug("Chunk times recorded"); - - try - { - result.skip(_chunkSize - 1); - } - catch (IllegalArgumentException e) - { - _logger.error("Position was: " + result.position()); - _logger.error("Tried to skip to: " + (_chunkSize * i)); - _logger.error("limit was; " + result.limit()); - } - } - _logger.debug("Chunks received now " + _chunksReceived); - _logger.debug("Bytes received: " + _totalBytesReceived); - _partialBytesRead = result.remaining(); - - if (_partialBytesRead > 0) - { - _partialCheckNumber = result.get(); - } - - - if (_chunksReceived >= _chunkCount) - { - _notifier.countDown(); - } - - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - _logger.error("Error: " + cause, cause); - } - } - - public void startWriter() throws IOException, InterruptedException - { - - _maximumWriteQueueLength = 0; - - IoConnector ioConnector = null; - - if (Boolean.getBoolean("multinio")) - { - _logger.warn("Using MultiThread NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.MultiThreadSocketConnector(); - } - else - { - _logger.warn("Using MINA NIO"); - ioConnector = new org.apache.mina.transport.socket.nio.SocketConnector(); - } - - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getDefaultConfig().getSessionConfig(); - scfg.setTcpNoDelay(true); - scfg.setSendBufferSize(32768); - scfg.setReceiveBufferSize(32768); - - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - - - final InetSocketAddress address = new InetSocketAddress("localhost", _PORT); - _logger.info("Attempting connection to " + address); - - //Old mina style -// ioConnector.setHandler(new WriterHandler()); -// ConnectFuture future = ioConnector.connect(address); - ConnectFuture future = ioConnector.connect(address, new WriterHandler()); - // wait for connection to complete - future.join(); - _logger.info("Connection completed"); - // we call getSession which throws an IOException if there has been an error connecting - _session = future.getSession(); - - _chunkTimes = new long[_chunkCount]; - Thread t = new Thread(this); - t.start(); - t.join(); - _logger.info("Test Complete"); - } - - - public void test1k() throws IOException, InterruptedException - { - _logger.info("Starting 1k test"); - _chunkSize = 1024; - startWriter(); - } - - - public void test2k() throws IOException, InterruptedException - { - _logger.info("Starting 2k test"); - _chunkSize = 2048; - startWriter(); - } - - - public void test4k() throws IOException, InterruptedException - { - _logger.info("Starting 4k test"); - _chunkSize = 4096; - startWriter(); - } - - - public void test8k() throws IOException, InterruptedException - { - _logger.info("Starting 8k test"); - _chunkSize = 8192; - startWriter(); - } - - - public void test16k() throws IOException, InterruptedException - { - _logger.info("Starting 16k test"); - _chunkSize = 16384; - startWriter(); - } - - - public void test32k() throws IOException, InterruptedException - { - _logger.info("Starting 32k test"); - _chunkSize = 32768; - startWriter(); - } - - - public static int getIntArg(String[] args, int index, int defaultValue) - { - if (args.length > index) - { - try - { - return Integer.parseInt(args[index]); - } - catch (NumberFormatException e) - { - //Do nothing - } - } - return defaultValue; - } - - public static void main(String[] args) throws IOException, InterruptedException - { - _PORT = getIntArg(args, 0, _PORT); - - int test = getIntArg(args, 1, DEFAULT_TEST_SIZE); - - IOWriterClient w = new IOWriterClient(); - w._chunkCount = getIntArg(args, 2, w._chunkCount); - switch (test) - { - case 0: - w.test1k(); - w.test2k(); - w.test4k(); - w.test8k(); - w.test16k(); - w.test32k(); - break; - case 1: - w.test1k(); - break; - case 2: - w.test2k(); - break; - case 4: - w.test4k(); - break; - case 8: - w.test8k(); - break; - case 16: - w.test16k(); - break; - case 32: - w.test32k(); - break; - } - } -} diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java deleted file mode 100644 index 423e98c67b..0000000000 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.mina.SocketIOTest; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.filter.ReadThrottleFilterBuilder; -import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again. */ -public class IOWriterServer -{ - private static final Logger _logger = LoggerFactory.getLogger(IOWriterServer.class); - - static public int _PORT = 9999; - - private static final String DEFAULT_READ_BUFFER = "262144"; - private static final String DEFAULT_WRITE_BUFFER = "262144"; - - - private static class TestHandler extends IoHandlerAdapter - { - private int _sentCount = 0; - - private int _bytesSent = 0; - - private int _receivedCount = 0; - - public void sessionCreated(IoSession ioSession) throws java.lang.Exception - { - IoFilterChain chain = ioSession.getFilterChain(); - - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); - readfilter.attach(chain); - - WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - - writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); - - writefilter.attach(chain); - - } - - public void messageReceived(IoSession session, Object message) throws Exception - { - ((ByteBuffer) message).acquire(); - session.write(message); - - if (_logger.isDebugEnabled()) - { - _bytesSent += ((ByteBuffer) message).remaining(); - - _sentCount++; - - if (_sentCount % 1000 == 0) - { - _logger.debug("Bytes sent: " + _bytesSent); - } - } - } - - public void messageSent(IoSession session, Object message) throws Exception - { - if (_logger.isDebugEnabled()) - { - ++_receivedCount; - - if (_receivedCount % 1000 == 0) - { - _logger.debug("Receieved count " + _receivedCount); - } - } - } - - public void exceptionCaught(IoSession session, Throwable cause) throws Exception - { - _logger.error("Error: " + cause, cause); - } - } - - public void startAcceptor() throws IOException - { - IoAcceptor acceptor; - if (Boolean.getBoolean("multinio")) - { - _logger.warn("Using MultiThread NIO"); - acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(); - } - else - { - _logger.warn("Using MINA NIO"); - acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(); - } - - - SocketSessionConfig sc = (SocketSessionConfig) acceptor.getDefaultConfig().getSessionConfig(); - sc.setTcpNoDelay(true); - sc.setSendBufferSize(32768); - sc.setReceiveBufferSize(32768); - - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - - //The old mina style -// acceptor.setLocalAddress(new InetSocketAddress(_PORT)); -// acceptor.setHandler(new TestHandler()); -// acceptor.bind(); - acceptor.bind(new InetSocketAddress(_PORT), new TestHandler()); - - _logger.info("Bound on port " + _PORT + ":" + _logger.isDebugEnabled()); - _logger.debug("debug on"); - } - - public static void main(String[] args) throws IOException - { - - if (args.length > 0) - { - try - { - _PORT = Integer.parseInt(args[0]); - } - catch (NumberFormatException e) - { - //IGNORE so use default port 9999; - } - } - - IOWriterServer a = new IOWriterServer(); - a.startAcceptor(); - } -} diff --git a/java/common/src/test/java/org/apache/qpid/AMQExceptionTest.java b/java/common/src/test/java/org/apache/qpid/AMQExceptionTest.java index ef6cd41492..f65427e583 100644 --- a/java/common/src/test/java/org/apache/qpid/AMQExceptionTest.java +++ b/java/common/src/test/java/org/apache/qpid/AMQExceptionTest.java @@ -23,6 +23,7 @@ package org.apache.qpid; import junit.framework.TestCase; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQShortString; /** * This test is to ensure that when an AMQException is rethrown that the specified exception is correctly wrapped up. @@ -91,6 +92,18 @@ public class AMQExceptionTest extends TestCase return amqe; } + public void testGetMessageAsString() + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 25; i++) + { + sb.append("message [" + i + "]"); + } + AMQException e = new AMQException(AMQConstant.INTERNAL_ERROR, sb.toString(), null); + AMQShortString message = e.getMessageAsShortString(); + assertEquals(sb.substring(0, AMQShortString.MAX_LENGTH - 3) + "...", message.toString()); + } + /** * Private class that extends AMQException but does not have a default exception. */ diff --git a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java index 62e25e7d79..272eb75800 100644 --- a/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java +++ b/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java @@ -21,6 +21,9 @@ package org.apache.qpid.codec; */ +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -46,9 +49,16 @@ public class AMQDecoderTest extends TestCase } - public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + private ByteBuffer getHeartbeatBodyBuffer() throws IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + HeartbeatBody.FRAME.writePayload(new DataOutputStream(baos)); + return ByteBuffer.wrap(baos.toByteArray()); + } + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException + { + ByteBuffer msg = getHeartbeatBodyBuffer(); ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); if (frames.get(0) instanceof AMQFrame) { @@ -60,9 +70,9 @@ public class AMQDecoderTest extends TestCase } } - public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = getHeartbeatBodyBuffer(); ByteBuffer msgA = msg.slice(); int msgbPos = msg.remaining() / 2; int msgaLimit = msg.remaining() - msgbPos; @@ -83,10 +93,10 @@ public class AMQDecoderTest extends TestCase } } - public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); msg.put(msgA); msg.put(msgB); @@ -106,11 +116,11 @@ public class AMQDecoderTest extends TestCase } } - public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { - ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); - ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = getHeartbeatBodyBuffer(); + ByteBuffer msgB = getHeartbeatBodyBuffer(); + ByteBuffer msgC = getHeartbeatBodyBuffer(); ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); sliceA.put(msgA); diff --git a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java index 92e7ce0a80..9a805d87b3 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java @@ -20,6 +20,10 @@ package org.apache.qpid.framing; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; + import junit.framework.TestCase; public class AMQShortStringTest extends TestCase { @@ -105,5 +109,215 @@ public class AMQShortStringTest extends TestCase assertFalse(new AMQShortString("A").equals(new AMQShortString("a"))); } + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(byte[])}. + */ + public void testCreateAMQShortStringByteArray() + { + byte[] bytes = null; + try + { + bytes = "test".getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + fail("UTF-8 encoding is not supported anymore by JVM:" + e.getMessage()); + } + AMQShortString string = new AMQShortString(bytes); + assertEquals("constructed amq short string length differs from expected", 4, string.length()); + assertTrue("constructed amq short string differs from expected", string.equals("test")); + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(java.lang.String)} + * <p> + * Tests short string construction from string with length less than 255. + */ + public void testCreateAMQShortStringString() + { + AMQShortString string = new AMQShortString("test"); + assertEquals("constructed amq short string length differs from expected", 4, string.length()); + assertTrue("constructed amq short string differs from expected", string.equals("test")); + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(char[])}. + * <p> + * Tests short string construction from char array with length less than 255. + */ + public void testCreateAMQShortStringCharArray() + { + char[] chars = "test".toCharArray(); + AMQShortString string = new AMQShortString(chars); + assertEquals("constructed amq short string length differs from expected", 4, string.length()); + assertTrue("constructed amq short string differs from expected", string.equals("test")); + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(java.lang.CharSequence)} + * <p> + * Tests short string construction from char sequence with length less than 255. + */ + public void testCreateAMQShortStringCharSequence() + { + AMQShortString string = new AMQShortString((CharSequence) "test"); + assertEquals("constructed amq short string length differs from expected", 4, string.length()); + assertTrue("constructed amq short string differs from expected", string.equals("test")); + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(byte[])}. + * <p> + * Tests an attempt to create an AMQP short string from byte array with length over 255. + */ + public void testCreateAMQShortStringByteArrayOver255() + { + String test = buildString('a', 256); + byte[] bytes = null; + try + { + bytes = test.getBytes("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + fail("UTF-8 encoding is not supported anymore by JVM:" + e.getMessage()); + } + try + { + new AMQShortString(bytes); + fail("It should not be possible to create AMQShortString with length over 255"); + } + catch (IllegalArgumentException e) + { + assertEquals("Exception message differs from expected", + "Cannot create AMQShortString with number of octets over 255!", e.getMessage()); + } + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(java.lang.String)} + * <p> + * Tests an attempt to create an AMQP short string from string with length over 255 + */ + public void testCreateAMQShortStringStringOver255() + { + String test = buildString('a', 256); + try + { + new AMQShortString(test); + fail("It should not be possible to create AMQShortString with length over 255"); + } + catch (IllegalArgumentException e) + { + assertEquals("Exception message differs from expected", + "Cannot create AMQShortString with number of octets over 255!", e.getMessage()); + } + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(char[])}. + * <p> + * Tests an attempt to create an AMQP short string from char array with length over 255. + */ + public void testCreateAMQShortStringCharArrayOver255() + { + String test = buildString('a', 256); + char[] chars = test.toCharArray(); + try + { + new AMQShortString(chars); + fail("It should not be possible to create AMQShortString with length over 255"); + } + catch (IllegalArgumentException e) + { + assertEquals("Exception message differs from expected", + "Cannot create AMQShortString with number of octets over 255!", e.getMessage()); + } + } + + /** + * Test method for + * {@link org.apache.qpid.framing.AMQShortString#AMQShortString(java.lang.CharSequence)} + * <p> + * Tests an attempt to create an AMQP short string from char sequence with length over 255. + */ + public void testCreateAMQShortStringCharSequenceOver255() + { + String test = buildString('a', 256); + try + { + new AMQShortString((CharSequence) test); + fail("It should not be possible to create AMQShortString with length over 255"); + } + catch (IllegalArgumentException e) + { + assertEquals("Exception message differs from expected", + "Cannot create AMQShortString with number of octets over 255!", e.getMessage()); + } + } + + /** + * Tests joining of short strings into a short string with length over 255. + */ + public void testJoinOverflow() + { + List<AMQShortString> data = new ArrayList<AMQShortString>(); + for (int i = 0; i < 25; i++) + { + data.add(new AMQShortString("test data!")); + } + try + { + AMQShortString.join(data, new AMQShortString(" ")); + fail("It should not be possible to create AMQShortString with length over 255"); + } + catch (IllegalArgumentException e) + { + assertEquals("Exception message differs from expected", + "Cannot create AMQShortString with number of octets over 255!", e.getMessage()); + } + } + + /** + * Tests joining of short strings into a short string with length less than 255. + */ + public void testJoin() + { + StringBuilder expected = new StringBuilder(); + List<AMQShortString> data = new ArrayList<AMQShortString>(); + data.add(new AMQShortString("test data 1")); + expected.append("test data 1"); + data.add(new AMQShortString("test data 2")); + expected.append(" test data 2"); + AMQShortString result = AMQShortString.join(data, new AMQShortString(" ")); + assertEquals("join result differs from expected", expected.toString(), result.asString()); + } + + /** + * A helper method to generate a string with given length containing given + * character + * + * @param ch + * char to build string with + * @param length + * target string length + * @return string + */ + private String buildString(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return sb.toString(); + } } diff --git a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java index 4fd1f60d69..5e7783f492 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; - import junit.framework.TestCase; +import java.io.*; + public class BasicContentHeaderPropertiesTest extends TestCase { @@ -76,15 +76,14 @@ public class BasicContentHeaderPropertiesTest extends TestCase assertEquals(99, _testProperties.getPropertyFlags()); } - public void testWritePropertyListPayload() + public void testWritePropertyListPayload() throws IOException { - ByteBuffer buf = ByteBuffer.allocate(300); - _testProperties.writePropertyListPayload(buf); + _testProperties.writePropertyListPayload(new DataOutputStream(new ByteArrayOutputStream(300))); } public void testPopulatePropertiesFromBuffer() throws Exception { - ByteBuffer buf = ByteBuffer.allocate(300); + DataInputStream buf = new DataInputStream(new ByteArrayInputStream(new byte[300])); _testProperties.populatePropertiesFromBuffer(buf, 99, 99); } diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index d4691ba097..bb4c9c3884 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -23,14 +23,14 @@ package org.apache.qpid.framing; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.mina.common.ByteBuffer; - import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.AMQPInvalidClassException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.*; + public class PropertyFieldTableTest extends TestCase { private static final Logger _logger = LoggerFactory.getLogger(PropertyFieldTableTest.class); @@ -441,7 +441,7 @@ public class PropertyFieldTableTest extends TestCase } /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */ - public void testNestedFieldTable() + public void testNestedFieldTable() throws IOException { byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 }; @@ -465,14 +465,16 @@ public class PropertyFieldTableTest extends TestCase outerTable.setFieldTable("innerTable", innerTable); // Write the outer table into the buffer. - final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4); - outerTable.writeToBuffer(buffer); - buffer.flip(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + outerTable.writeToBuffer(new DataOutputStream(baos)); + + byte[] data = baos.toByteArray(); // Extract the table back from the buffer again. try { - FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer); + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(new DataInputStream(new ByteArrayInputStream(data))); FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); @@ -567,7 +569,7 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table.getObject("object-string")); } - public void testwriteBuffer() + public void testwriteBuffer() throws IOException { byte[] bytes = { 99, 98, 97, 96, 95 }; @@ -585,15 +587,17 @@ public class PropertyFieldTableTest extends TestCase table.setString("string", "hello"); table.setString("null-string", null); - final ByteBuffer buffer = ByteBuffer.allocate((int) table.getEncodedSize() + 4); // FIXME XXX: Is cast a problem? - table.writeToBuffer(buffer); + ByteArrayOutputStream baos = new ByteArrayOutputStream((int) table.getEncodedSize() + 4); + table.writeToBuffer(new DataOutputStream(baos)); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); - buffer.flip(); - long length = buffer.getUnsignedInt(); + long length = dis.readInt() & 0xFFFFFFFFL; - FieldTable table2 = new FieldTable(buffer, length); + FieldTable table2 = new FieldTable(dis, length); Assert.assertEquals((Boolean) true, table2.getBoolean("bool")); Assert.assertEquals((Byte) Byte.MAX_VALUE, table2.getByte("byte")); diff --git a/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/java/common/src/test/java/org/apache/qpid/session/TestSession.java deleted file mode 100644 index aafc91b03b..0000000000 --- a/java/common/src/test/java/org/apache/qpid/session/TestSession.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.session; - -import org.apache.mina.common.*; - -import java.net.SocketAddress; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; - -public class TestSession implements IoSession -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - - public TestSession() - { - } - - public IoService getService() - { - return null; //TODO - } - - public IoServiceConfig getServiceConfig() - { - return null; //TODO - } - - public IoHandler getHandler() - { - return null; //TODO - } - - public IoSessionConfig getConfig() - { - return null; //TODO - } - - public IoFilterChain getFilterChain() - { - return null; //TODO - } - - public WriteFuture write(Object message) - { - return null; //TODO - } - - public CloseFuture close() - { - return null; //TODO - } - - public Object getAttachment() - { - return getAttribute(""); - } - - public Object setAttachment(Object attachment) - { - return setAttribute("",attachment); - } - - public Object getAttribute(String key) - { - return attributes.get(key); - } - - public Object setAttribute(String key, Object value) - { - return attributes.put(key,value); - } - - public Object setAttribute(String key) - { - return attributes.put(key, Boolean.TRUE); - } - - public Object removeAttribute(String key) - { - return attributes.remove(key); - } - - public boolean containsAttribute(String key) - { - return attributes.containsKey(key); - } - - public Set getAttributeKeys() - { - return attributes.keySet(); - } - - public TransportType getTransportType() - { - return null; //TODO - } - - public boolean isConnected() - { - return false; //TODO - } - - public boolean isClosing() - { - return false; //TODO - } - - public CloseFuture getCloseFuture() - { - return null; //TODO - } - - public SocketAddress getRemoteAddress() - { - return null; //TODO - } - - public SocketAddress getLocalAddress() - { - return null; //TODO - } - - public SocketAddress getServiceAddress() - { - return null; //TODO - } - - public int getIdleTime(IdleStatus status) - { - return 0; //TODO - } - - public long getIdleTimeInMillis(IdleStatus status) - { - return 0; //TODO - } - - public void setIdleTime(IdleStatus status, int idleTime) - { - //TODO - } - - public int getWriteTimeout() - { - return 0; //TODO - } - - public long getWriteTimeoutInMillis() - { - return 0; //TODO - } - - public void setWriteTimeout(int writeTimeout) - { - //TODO - } - - public TrafficMask getTrafficMask() - { - return null; //TODO - } - - public void setTrafficMask(TrafficMask trafficMask) - { - //TODO - } - - public void suspendRead() - { - //TODO - } - - public void suspendWrite() - { - //TODO - } - - public void resumeRead() - { - //TODO - } - - public void resumeWrite() - { - //TODO - } - - public long getReadBytes() - { - return 0; //TODO - } - - public long getWrittenBytes() - { - return 0; //TODO - } - - public long getReadMessages() - { - return 0; - } - - public long getWrittenMessages() - { - return 0; - } - - public long getWrittenWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteRequests() - { - return 0; //TODO - } - - public int getScheduledWriteBytes() - { - return 0; //TODO - } - - public long getCreationTime() - { - return 0; //TODO - } - - public long getLastIoTime() - { - return 0; //TODO - } - - public long getLastReadTime() - { - return 0; //TODO - } - - public long getLastWriteTime() - { - return 0; //TODO - } - - public boolean isIdle(IdleStatus status) - { - return false; //TODO - } - - public int getIdleCount(IdleStatus status) - { - return 0; //TODO - } - - public long getLastIdleTime(IdleStatus status) - { - return 0; //TODO - } -} diff --git a/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java b/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java new file mode 100644 index 0000000000..288946e064 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/ssl/SSLContextFactoryTest.java @@ -0,0 +1,84 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.ssl; + +import java.io.IOException; + +import javax.net.ssl.SSLContext; + +import org.apache.qpid.test.utils.QpidTestCase; + +public class SSLContextFactoryTest extends QpidTestCase +{ + private static final String BROKER_KEYSTORE_PATH = TEST_RESOURCES_DIR + "/ssl/java_broker_keystore.jks"; + private static final String CLIENT_KEYSTORE_PATH = TEST_RESOURCES_DIR + "/ssl/java_client_keystore.jks"; + private static final String CLIENT_TRUSTSTORE_PATH = TEST_RESOURCES_DIR + "/ssl/java_client_truststore.jks"; + private static final String STORE_PASSWORD = "password"; + private static final String CERT_TYPE = "SunX509"; + private static final String CERT_ALIAS_APP1 = "app1"; + + public void testBuildServerContext() throws Exception + { + SSLContext context = SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, STORE_PASSWORD, CERT_TYPE); + assertNotNull("SSLContext should not be null", context); + } + + public void testBuildServerContextWithIncorrectPassword() throws Exception + { + try + { + SSLContextFactory.buildServerContext(BROKER_KEYSTORE_PATH, "sajdklsad", CERT_TYPE); + fail("Exception was not thrown due to incorrect password"); + } + catch (IOException e) + { + //expected + } + } + + public void testTrustStoreDoesNotExist() throws Exception + { + try + { + SSLContextFactory.buildClientContext("/path/to/nothing", STORE_PASSWORD, CERT_TYPE, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, CERT_TYPE, null); + fail("Exception was not thrown due to incorrect path"); + } + catch (IOException e) + { + //expected + } + } + + public void testBuildClientContextForSSLEncryptionOnly() throws Exception + { + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, CERT_TYPE, null, null, null, null); + assertNotNull("SSLContext should not be null", context); + } + + public void testBuildClientContextWithForClientAuth() throws Exception + { + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, CERT_TYPE, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, CERT_TYPE, null); + assertNotNull("SSLContext should not be null", context); + } + + public void testBuildClientContextWithForClientAuthWithCertAlias() throws Exception + { + SSLContext context = SSLContextFactory.buildClientContext(CLIENT_TRUSTSTORE_PATH, STORE_PASSWORD, CERT_TYPE, CLIENT_KEYSTORE_PATH, STORE_PASSWORD, CERT_TYPE, CERT_ALIAS_APP1); + assertNotNull("SSLContext should not be null", context); + } +} diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 8b470d555e..e69f95f916 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -24,17 +24,28 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.*; import junit.framework.TestCase; import junit.framework.TestResult; +import org.apache.log4j.Level; import org.apache.log4j.Logger; + public class QpidTestCase extends TestCase { - protected static final Logger _logger = Logger.getLogger(QpidTestCase.class); + public static final String QPID_HOME = System.getProperty("QPID_HOME"); + public static final String TEST_RESOURCES_DIR = QPID_HOME + "/../test-profiles/test_resources/"; + + private static final Logger _logger = Logger.getLogger(QpidTestCase.class); + + private final Map<Logger, Level> _loggerLevelSetForTest = new HashMap<Logger, Level>(); + private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>(); + + private String _testName; /** * Some tests are excluded when the property test.excludes is set to true. @@ -54,7 +65,7 @@ public class QpidTestCase extends TestCase String exclusionListString = System.getProperties().getProperty("test.excludelist", ""); List<String> exclusionList = new ArrayList<String>(); - for (String uri : exclusionListURIs.split("\\s+")) + for (String uri : exclusionListURIs.split(";\\s*")) { File file = new File(uri); if (file.exists()) @@ -76,6 +87,10 @@ public class QpidTestCase extends TestCase _logger.warn("Exception when reading exclusion list", e); } } + else + { + _logger.info("Specified exclude file does not exist: " + uri); + } } if (!exclusionListString.equals("")) @@ -127,4 +142,187 @@ public class QpidTestCase extends TestCase return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; } + + + public static final int MIN_PORT_NUMBER = 1; + public static final int MAX_PORT_NUMBER = 49151; + + + /** + * Gets the next available port starting at a port. + * + * @param fromPort the port to scan for availability + * @throws NoSuchElementException if there are no ports available + */ + protected int getNextAvailable(int fromPort) + { + if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + fromPort); + } + + for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) + { + if (available(i)) { + return i; + } + } + + throw new NoSuchElementException("Could not find an available port above " + fromPort); + } + + /** + * Checks to see if a specific port is available. + * + * @param port the port to check for availability + */ + private boolean available(int port) + { + if ((port < MIN_PORT_NUMBER) || (port > MAX_PORT_NUMBER)) + { + throw new IllegalArgumentException("Invalid start port: " + port); + } + + ServerSocket ss = null; + DatagramSocket ds = null; + try + { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + return true; + } + catch (IOException e) + { + } + finally + { + if (ds != null) + { + ds.close(); + } + + if (ss != null) + { + try + { + ss.close(); + } + catch (IOException e) + { + /* should not be thrown */ + } + } + } + + return false; + } + + public int findFreePort() + { + return getNextAvailable(10000); + } + + /** + * Set a System property for duration of this test only. The tearDown will + * guarantee to reset the property to its previous value after the test + * completes. + * + * @param property The property to set + * @param value the value to set it to, if null, the property will be cleared + */ + protected void setTestSystemProperty(final String property, final String value) + { + if (!_propertiesSetForTest.containsKey(property)) + { + // Record the current value so we can revert it later. + _propertiesSetForTest.put(property, System.getProperty(property)); + } + + if (value == null) + { + System.clearProperty(property); + } + else + { + System.setProperty(property, value); + } + } + + /** + * Restore the System property values that were set by this test run. + */ + protected void revertTestSystemProperties() + { + if(!_propertiesSetForTest.isEmpty()) + { + _logger.debug("reverting " + _propertiesSetForTest.size() + " test properties"); + for (String key : _propertiesSetForTest.keySet()) + { + String value = _propertiesSetForTest.get(key); + if (value != null) + { + System.setProperty(key, value); + } + else + { + System.clearProperty(key); + } + } + + _propertiesSetForTest.clear(); + } + } + + /** + * Adjust the VMs Log4j Settings just for this test run + * + * @param logger the logger to change + * @param level the level to set + */ + protected void setLoggerLevel(Logger logger, Level level) + { + assertNotNull("Cannot set level of null logger", logger); + assertNotNull("Cannot set Logger("+logger.getName()+") to null level.",level); + + if (!_loggerLevelSetForTest.containsKey(logger)) + { + // Record the current value so we can revert it later. + _loggerLevelSetForTest.put(logger, logger.getLevel()); + } + + logger.setLevel(level); + } + + /** + * Restore the logging levels defined by this test. + */ + protected void revertLoggingLevels() + { + for (Logger logger : _loggerLevelSetForTest.keySet()) + { + logger.setLevel(_loggerLevelSetForTest.get(logger)); + } + + _loggerLevelSetForTest.clear(); + } + + protected void tearDown() throws java.lang.Exception + { + _logger.info("========== tearDown " + _testName + " =========="); + revertTestSystemProperties(); + revertLoggingLevels(); + } + + protected void setUp() throws Exception + { + _testName = getClass().getSimpleName() + "." + getName(); + _logger.info("========== start " + _testName + " =========="); + } + + protected String getTestName() + { + return _testName; + } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 375a326654..49f6a08007 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,32 +20,27 @@ */ package org.apache.qpid.transport; -import org.apache.mina.util.AvailablePortFinder; - -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.transport.util.Waiter; +import static org.apache.qpid.transport.Option.EXPECTED; +import static org.apache.qpid.transport.Option.NONE; +import static org.apache.qpid.transport.Option.SYNC; +import java.io.IOException; import java.util.ArrayList; -import java.util.List; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.io.IOException; -import static org.apache.qpid.transport.Option.*; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; +import org.apache.qpid.transport.util.Waiter; /** * ConnectionTest */ - public class ConnectionTest extends QpidTestCase implements SessionListener { - - private static final Logger log = Logger.get(ConnectionTest.class); - private int port; private volatile boolean queue = false; private List<MessageTransfer> messages = new ArrayList<MessageTransfer>(); @@ -58,7 +53,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { super.setUp(); - port = AvailablePortFinder.getNextAvailable(12000); + port = findFreePort(); } protected void tearDown() throws Exception @@ -158,7 +153,8 @@ public class ConnectionTest extends QpidTestCase implements SessionListener private Connection connect(final CountDownLatch closed) { - Connection conn = new Connection(); + final Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.addConnectionListener(new ConnectionListener() { public void opened(Connection conn) {} @@ -182,9 +178,9 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { // Force os.name to be windows to exercise code in IoReceiver // that looks for the value of os.name - System.setProperty("os.name","windows"); + setTestSystemProperty("os.name","windows"); - // Start server as 0-9 to froce a ProtocolVersionException + // Start server as 0-9 to force a ProtocolVersionException startServer(new ProtocolHeader(1, 0, 9)); CountDownLatch closed = new CountDownLatch(1); @@ -219,7 +215,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.send(protocolHeader); List<Object> utf8 = new ArrayList<Object>(); utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); + conn.connectionStart(null, Collections.emptyList(), utf8); } @Override @@ -270,40 +266,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } - class FailoverConnectionListener implements ConnectionListener - { - public void opened(Connection conn) {} - - public void exception(Connection conn, ConnectionException e) - { - throw e; - } - - public void closed(Connection conn) - { - queue = true; - conn.connect("localhost", port, null, "guest", "guest"); - conn.resume(); - } - } - - class TestSessionListener implements SessionListener - { - public void opened(Session s) {} - public void resumed(Session s) {} - public void exception(Session s, SessionException e) {} - public void message(Session s, MessageTransfer xfr) - { - synchronized (incoming) - { - incoming.add(xfr); - incoming.notifyAll(); - } - s.processed(xfr); - } - public void closed(Session s) {} - } public void testResumeNonemptyReplayBuffer() throws Exception { @@ -311,6 +274,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener Connection conn = new Connection(); conn.addConnectionListener(new FailoverConnectionListener()); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); ssn.setSessionListener(new TestSessionListener()); @@ -365,6 +329,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.addConnectionListener(new FailoverConnectionListener()); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(1); @@ -387,6 +352,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); ssn.sessionFlush(EXPECTED); @@ -400,6 +366,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener { startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); conn.connectionHeartbeat(); conn.close(); @@ -410,6 +377,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0"); @@ -429,6 +397,7 @@ public class ConnectionTest extends QpidTestCase implements SessionListener startServer(); Connection conn = new Connection(); + conn.setConnectionDelegate(new ClientDelegate(new ConnectionSettings())); conn.connect("localhost", port, null, "guest", "guest"); Session ssn = conn.createSession(); send(ssn, "EXCP 0", true); @@ -443,4 +412,38 @@ public class ConnectionTest extends QpidTestCase implements SessionListener } } + class FailoverConnectionListener implements ConnectionListener + { + public void opened(Connection conn) {} + + public void exception(Connection conn, ConnectionException e) + { + throw e; + } + + public void closed(Connection conn) + { + queue = true; + conn.connect("localhost", port, null, "guest", "guest"); + conn.resume(); + } + } + + class TestSessionListener implements SessionListener + { + public void opened(Session s) {} + public void resumed(Session s) {} + public void exception(Session s, SessionException e) {} + public void message(Session s, MessageTransfer xfr) + { + synchronized (incoming) + { + incoming.add(xfr); + incoming.notifyAll(); + } + + s.processed(xfr); + } + public void closed(Session s) {} + } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/MockSender.java b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java new file mode 100644 index 0000000000..4b38b7318a --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public class MockSender implements Sender<ByteBuffer> +{ + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/SessionTimeoutTest.java b/java/common/src/test/java/org/apache/qpid/transport/SessionTimeoutTest.java new file mode 100644 index 0000000000..5f1c1254a2 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/SessionTimeoutTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.transport; + +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.test.utils.QpidTestCase; + + +public class SessionTimeoutTest extends QpidTestCase +{ + public void testSessionTimeout() + { + try + { + long timeout = 1; + setTestSystemProperty("qpid.sync_op_timeout", Long.toString(timeout)); + assertSessionTimeout(timeout); + } + finally + { + revertTestSystemProperties(); + } + } + + public void testSessionTimeoutSetWith_amqj_default_syncwrite_timeout() + { + try + { + long timeout = 1; + setTestSystemProperty("amqj.default_syncwrite_timeout", Long.toString(timeout)); + setTestSystemProperty("qpid.sync_op_timeout", null); + assertSessionTimeout(timeout); + } + finally + { + revertTestSystemProperties(); + } + } + + private void assertSessionTimeout(long timeout) + { + Session session = new TestSession(null, null, 0); + long startTime = System.currentTimeMillis(); + try + { + session.awaitOpen(); + fail("SessionTimeoutException is expected!"); + } + catch (SessionException e) + { + long elapsedTime = System.currentTimeMillis() - startTime; + assertTrue("Expected timeout should happened in " + timeout + " ms but timeout occured in " + + elapsedTime + " ms!", elapsedTime >= timeout && elapsedTime < ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT); + } + } + + class TestSession extends Session + { + public TestSession(Connection connection, Binary name, long expiry) + { + super(connection, name, expiry); + } + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 957a7190ee..8533c64fab 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java +++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -25,34 +25,36 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; /** * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, * so if this class is being used and some methods are to be used, then please update those. */ -public class TestNetworkDriver implements NetworkDriver +public class TestNetworkConnection implements NetworkConnection { - private final ConcurrentMap attributes = new ConcurrentHashMap(); private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = 1; private SocketAddress _localAddress = null; private SocketAddress _remoteAddress = null; + private final MockSender _sender; - public TestNetworkDriver() + public TestNetworkConnection() { + _sender = new MockSender(); } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException { - + } public SocketAddress getLocalAddress() @@ -65,40 +67,40 @@ public class TestNetworkDriver implements NetworkDriver return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); } - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws OpenException { - + } public void setMaxReadIdle(int idleTime) { - + } public void setMaxWriteIdle(int idleTime) { - + } public void close() { - + } public void flush() { - + } public void send(ByteBuffer msg) { - + } public void setIdleTimeout(int i) { - + } public void setPort(int port) @@ -130,4 +132,13 @@ public class TestNetworkDriver implements NetworkDriver { _remoteAddress = address; } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } + + public void start() + { + } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java new file mode 100644 index 0000000000..7039b904e3 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java @@ -0,0 +1,157 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + + +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; + +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.io.IoNetworkTransport; + +public class TransportTest extends QpidTestCase +{ + + + + public void testDefaultGetOutgoingTransportForv0_8() throws Exception + { + final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof IoNetworkTransport); + } + + public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception + { + setTestSystemProperty(Transport.QPID_TRANSPORT_PROPNAME, TestOutgoingNetworkTransport.class.getName()); + + final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof TestOutgoingNetworkTransport); + } + + public void testProtocolSpecificOverriddenOutgoingTransportForv0_8() throws Exception + { + setTestSystemProperty(Transport.QPID_TRANSPORT_V0_8_PROPNAME, TestOutgoingNetworkTransport.class.getName()); + + final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof TestOutgoingNetworkTransport); + } + + public void testDefaultGetOutgoingTransportForv0_10() throws Exception + { + final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof IoNetworkTransport); + } + + public void testDefaultGetIncomingTransport() throws Exception + { + final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance(); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof IoNetworkTransport); + } + + public void testOverriddenGetIncomingTransport() throws Exception + { + setTestSystemProperty(Transport.QPID_BROKER_TRANSPORT_PROPNAME, TestIncomingNetworkTransport.class.getName()); + + final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance(); + assertNotNull(networkTransport); + assertTrue(networkTransport instanceof TestIncomingNetworkTransport); + } + + public void testInvalidOutgoingTransportClassName() throws Exception + { + setTestSystemProperty(Transport.QPID_TRANSPORT_PROPNAME, "invalid"); + + try + { + Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10); + fail("Should have failed to load the invalid class"); + } + catch(TransportException te) + { + //expected, ignore + } + } + + public void testInvalidOutgoingTransportProtocolVersion() throws Exception + { + try + { + Transport.getOutgoingTransportInstance(new ProtocolVersion((byte)0, (byte)0)); + fail("Should have failed to load the transport for invalid protocol version"); + } + catch(IllegalArgumentException iae) + { + //expected, ignore + } + } + + public static class TestOutgoingNetworkTransport implements OutgoingNetworkTransport + { + + public void close() + { + throw new UnsupportedOperationException(); + } + + public NetworkConnection getConnection() + { + throw new UnsupportedOperationException(); + } + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, SSLContext sslContext) + { + throw new UnsupportedOperationException(); + } + } + + public static class TestIncomingNetworkTransport implements IncomingNetworkTransport + { + + public void close() + { + throw new UnsupportedOperationException(); + } + + public NetworkConnection getConnection() + { + throw new UnsupportedOperationException(); + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, SSLContext sslContext) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java new file mode 100644 index 0000000000..e075681acb --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.TransportException; + +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; + +import java.nio.ByteBuffer; + + +/** + * IoAcceptor + * + */ + +public class IoAcceptor<E> extends Thread +{ + + + private ServerSocket socket; + private Binding<E,ByteBuffer> binding; + + public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + throws IOException + { + socket = new ServerSocket(); + socket.setReuseAddress(true); + socket.bind(address); + this.binding = binding; + + setName(String.format("IoAcceptor - %s", socket.getInetAddress())); + } + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() throws IOException + { + if (!socket.isClosed()) + { + socket.close(); + } + } + + public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + throws IOException + { + this(new InetSocketAddress(host, port), binding); + } + + public void run() + { + while (true) + { + try + { + Socket sock = socket.accept(); + IoTransport<E> transport = new IoTransport<E>(sock, binding); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java new file mode 100644 index 0000000000..215c6d9931 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.transport.network.io; + +import java.net.Socket; +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.util.Logger; + +/** + * This class provides a socket based transport using the java.io + * classes. + * + * The following params are configurable via JVM arguments + * TCP_NO_DELAY - amqj.tcpNoDelay + * SO_RCVBUF - amqj.receiveBufferSize + * SO_SNDBUF - amqj.sendBufferSize + */ +public final class IoTransport<E> +{ + + + private static final Logger log = Logger.get(IoTransport.class); + + private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + private static int readBufferSize = Integer.getInteger + ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + private static int writeBufferSize = Integer.getInteger + ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + + private Socket socket; + private Sender<ByteBuffer> sender; + private E endpoint; + private IoReceiver receiver; + private long timeout = 60000; + + IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + { + this.socket = socket; + setupTransport(socket, binding); + } + + private void setupTransport(Socket socket, Binding<E, ByteBuffer> binding) + { + IoSender ios = new IoSender(socket, 2*writeBufferSize, timeout); + ios.initiate(); + + this.sender = ios; + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(socket, binding.receiver(endpoint), + 2*readBufferSize, timeout); + this.receiver.initiate(); + + ios.registerCloseListener(this.receiver); + } + + public Sender<ByteBuffer> getSender() + { + return sender; + } + + public IoReceiver getReceiver() + { + return receiver; + } + + public Socket getSocket() + { + return socket; + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java deleted file mode 100644 index fc8e689ca4..0000000000 --- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import junit.framework.TestCase; - -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.OpenException; - -public class MINANetworkDriverTest extends TestCase -{ - - private static final String TEST_DATA = "YHALOTHAR"; - private static int TEST_PORT = 2323; - private NetworkDriver _server; - private NetworkDriver _client; - private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read - private Exception _thrownEx; - - @Override - public void setUp() - { - _server = new MINANetworkDriver(); - _client = new MINANetworkDriver(); - _thrownEx = null; - _countingEngine = new CountingProtocolEngine(); - // increment the port to prevent tests clashing with each other when - // the port is in TIMED_WAIT state. - TEST_PORT++; - } - - @Override - public void tearDown() - { - if (_server != null) - { - _server.close(); - } - - if (_client != null) - { - _client.close(); - } - } - - /** - * Tests that a socket can't be opened if a driver hasn't been bound - * to the port and can be opened if a driver has been bound. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testBindOpen() throws BindException, UnknownHostException, OpenException - { - try - { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - catch (OpenException e) - { - _thrownEx = e; - } - - assertNotNull("Open should have failed since no engine bound", _thrownEx); - - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - - /** - * Tests that a socket can't be opened after a bound NetworkDriver has been closed - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - _client.close(); - _server.close(); - - try - { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - catch (OpenException e) - { - _thrownEx = e; - } - assertNotNull("Open should have failed", _thrownEx); - } - - /** - * Checks that the right exception is thrown when binding a NetworkDriver to an already - * existing socket. - */ - public void testBindPortInUse() - { - try - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - } - catch (BindException e) - { - fail("First bind should not fail"); - } - - try - { - _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - } - catch (BindException e) - { - _thrownEx = e; - } - assertNotNull("Second bind should throw BindException", _thrownEx); - } - - /** - * tests that bytes sent on a network driver are received at the other end - * - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - * @throws BindException - */ - public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - - // Tell the counting engine how much data we're sending - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - - // Send the data and wait for up to 2 seconds to get it back - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - - // Check what we got - assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes()); - } - - /** - * Opens a connection with a low read idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException - * - */ - public void testSetReadIdle() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _client.setMaxReadIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); - } - - /** - * Opens a connection with a low write idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException - * - */ - public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _client.setMaxWriteIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); - } - - - /** - * Creates and then closes a connection from client to server and checks that the server - * has its closed() method called. Then creates a new client and closes the server to check - * that the client has its closed() method called. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testClosed() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.bind(TEST_PORT, null, factory, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - EchoProtocolEngine serverEngine = null; - while (serverEngine == null) - { - serverEngine = factory.getEngine(); - if (serverEngine == null) - { - try - { - Thread.sleep(10); - } - catch (InterruptedException e) - { - } - } - } - assertFalse("Server should not have been closed", serverEngine.getClosed()); - serverEngine.setNewLatch(1); - _client.close(); - try - { - serverEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Server should have been closed", serverEngine.getClosed()); - - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - _countingEngine.setClosed(false); - assertFalse("Client should not have been closed", _countingEngine.getClosed()); - _countingEngine.setNewLatch(1); - _server.close(); - try - { - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Client should have been closed", _countingEngine.getClosed()); - } - - /** - * Create a connection and instruct the client to throw an exception when it gets some data - * and that the latch gets counted down. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - */ - public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - - - assertEquals("Exception should not have been thrown", 1, - _countingEngine.getExceptionLatch().getCount()); - _countingEngine.setErrorOnNextRead(true); - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should have been thrown", 0, - _countingEngine.getExceptionLatch().getCount()); - } - - /** - * Opens a connection and checks that the remote address is the one that was asked for - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), - _client.getRemoteAddress()); - } - - private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory - { - EchoProtocolEngine _engine = null; - - public ProtocolEngine newProtocolEngine(NetworkDriver driver) - { - if (_engine == null) - { - _engine = new EchoProtocolEngine(); - _engine.setNetworkDriver(driver); - } - return getEngine(); - } - - public EchoProtocolEngine getEngine() - { - return _engine; - } - } - - public class CountingProtocolEngine implements ProtocolEngine - { - - protected NetworkDriver _driver; - public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>(); - private int _readBytes; - private CountDownLatch _latch = new CountDownLatch(0); - private boolean _readerHasBeenIdle; - private boolean _writerHasBeenIdle; - private boolean _closed = false; - private boolean _nextReadErrors = false; - private CountDownLatch _exceptionLatch = new CountDownLatch(1); - - public void closed() - { - setClosed(true); - _latch.countDown(); - } - - public void setErrorOnNextRead(boolean b) - { - _nextReadErrors = b; - } - - public void setNewLatch(int length) - { - _latch = new CountDownLatch(length); - } - - public long getReadBytes() - { - return _readBytes; - } - - public SocketAddress getRemoteAddress() - { - if (_driver != null) - { - return _driver.getRemoteAddress(); - } - else - { - return null; - } - } - - public SocketAddress getLocalAddress() - { - if (_driver != null) - { - return _driver.getLocalAddress(); - } - else - { - return null; - } - } - - public long getWrittenBytes() - { - return 0; - } - - public void readerIdle() - { - _readerHasBeenIdle = true; - } - - public void setNetworkDriver(NetworkDriver driver) - { - _driver = driver; - } - - public void writeFrame(AMQDataBlock frame) - { - - } - - public void writerIdle() - { - _writerHasBeenIdle = true; - } - - public void exception(Throwable t) - { - _exceptionLatch.countDown(); - } - - public CountDownLatch getExceptionLatch() - { - return _exceptionLatch; - } - - public void received(ByteBuffer msg) - { - // increment read bytes and count down the latch for that many - int bytes = msg.remaining(); - _readBytes += bytes; - for (int i = 0; i < bytes; i++) - { - _latch.countDown(); - } - - // Throw an error if we've been asked too, but we can still count - if (_nextReadErrors) - { - throw new RuntimeException("Was asked to error"); - } - } - - public CountDownLatch getLatch() - { - return _latch; - } - - public boolean getWriterHasBeenIdle() - { - return _writerHasBeenIdle; - } - - public boolean getReaderHasBeenIdle() - { - return _readerHasBeenIdle; - } - - public void setClosed(boolean _closed) - { - this._closed = _closed; - } - - public boolean getClosed() - { - return _closed; - } - - } - - private class EchoProtocolEngine extends CountingProtocolEngine - { - - public void received(ByteBuffer msg) - { - super.received(msg); - msg.rewind(); - _driver.send(msg); - } - } - - public static void sleepForAtLeast(long period) - { - long start = System.currentTimeMillis(); - long timeLeft = period; - while (timeLeft > 0) - { - try - { - Thread.sleep(timeLeft); - } - catch (InterruptedException e) - { - // Ignore it - } - timeLeft = period - (System.currentTimeMillis() - start); - } - } -}
\ No newline at end of file diff --git a/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java b/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java index 7eba5f092e..d6767eb9c0 100644 --- a/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java +++ b/java/common/src/test/java/org/apache/qpid/util/FileUtilsTest.java @@ -27,7 +27,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; +import java.io.InputStream; import java.util.List; +import java.util.Properties; public class FileUtilsTest extends TestCase { @@ -182,6 +184,20 @@ public class FileUtilsTest extends TestCase } } + + /** + * Helper method to create a temporary file with test content. + * + * @param test_data The data to store in the file + * + * @return The File reference + */ + private File createTestFileInTmpDir(final String testData) throws Exception + { + final File tmpFile = File.createTempFile("test", "tmp"); + + return createTestFile(tmpFile.getCanonicalPath(), testData); + } /** * Helper method to create a test file with a string content * @@ -302,8 +318,74 @@ public class FileUtilsTest extends TestCase // expected path } } + + /** + * Tests that openFileOrDefaultResource can open a file on the filesystem. + * + */ + public void testOpenFileOrDefaultResourceOpensFileOnFileSystem() throws Exception + { + final File testFile = createTestFileInTmpDir("src=tmpfile"); + final String filenameOnFilesystem = testFile.getCanonicalPath(); + final String defaultResource = "org/apache/qpid/util/default.properties"; + + + final InputStream is = FileUtils.openFileOrDefaultResource(filenameOnFilesystem, defaultResource, this.getClass().getClassLoader()); + assertNotNull("Stream must not be null", is); + final Properties p = new Properties(); + p.load(is); + assertEquals("tmpfile", p.getProperty("src")); + } /** + * Tests that openFileOrDefaultResource can open a file on the classpath. + * + */ + public void testOpenFileOrDefaultResourceOpensFileOnClasspath() throws Exception + { + final String mydefaultsResource = "org/apache/qpid/util/mydefaults.properties"; + final String defaultResource = "org/apache/qpid/util/default.properties"; + + + final InputStream is = FileUtils.openFileOrDefaultResource(mydefaultsResource, defaultResource, this.getClass().getClassLoader()); + assertNotNull("Stream must not be null", is); + final Properties p = new Properties(); + p.load(is); + assertEquals("mydefaults", p.getProperty("src")); + } + + /** + * Tests that openFileOrDefaultResource returns the default resource when file cannot be found. + */ + public void testOpenFileOrDefaultResourceOpensDefaultResource() throws Exception + { + final File fileThatDoesNotExist = new File("/does/not/exist.properties"); + assertFalse("Test must not exist", fileThatDoesNotExist.exists()); + + final String defaultResource = "org/apache/qpid/util/default.properties"; + + final InputStream is = FileUtils.openFileOrDefaultResource(fileThatDoesNotExist.getCanonicalPath(), defaultResource, this.getClass().getClassLoader()); + assertNotNull("Stream must not be null", is); + Properties p = new Properties(); + p.load(is); + assertEquals("default.properties", p.getProperty("src")); + } + + /** + * Tests that openFileOrDefaultResource returns null if neither the file nor + * the default resource can be found.. + */ + public void testOpenFileOrDefaultResourceReturnsNullWhenNeitherCanBeFound() throws Exception + { + + final String mydefaultsResource = "org/apache/qpid/util/doesnotexisteiether.properties"; + final String defaultResource = "org/apache/qpid/util/doesnotexisteiether.properties"; + + final InputStream is = FileUtils.openFileOrDefaultResource(mydefaultsResource, defaultResource, this.getClass().getClassLoader()); + assertNull("Stream must be null", is); + } + + /** * Given two lists of File arrays ensure they are the same length and all entries in Before are in After * * @param filesBefore File[] diff --git a/java/common/src/test/java/org/apache/qpid/util/default.properties b/java/common/src/test/java/org/apache/qpid/util/default.properties new file mode 100644 index 0000000000..cb522ea9a7 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/util/default.properties @@ -0,0 +1,2 @@ +# Used by FileUtilsTests +src=default.properties
\ No newline at end of file diff --git a/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties b/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties new file mode 100644 index 0000000000..6a49d927d0 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/util/mydefaults.properties @@ -0,0 +1,2 @@ +# Used by FileUtilsTests +src=mydefaults
\ No newline at end of file |
