diff options
| author | Stephen Vinoski <vinoski@apache.org> | 2006-11-18 03:48:15 +0000 |
|---|---|---|
| committer | Stephen Vinoski <vinoski@apache.org> | 2006-11-18 03:48:15 +0000 |
| commit | be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4 (patch) | |
| tree | 5f155aab31fc2f3871c0b7421d4d7c56e80f3b0a /java/client | |
| parent | 1db5a8a2329ec064d1683294ee1a3d8d233de42d (diff) | |
| download | qpid-python-be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4.tar.gz | |
complete bringing initial maven work to trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/pom.xml | 152 | ||||
| -rw-r--r-- | java/client/src/main/java/log4j.properties (renamed from java/client/src/log4j.properties) | 0 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java | 14 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java | 38 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java | 9 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java | 7 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/codec/Client.java | 6 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/codec/Server.java | 7 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java | 14 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/mina/WriterTest.java | 8 | ||||
| -rw-r--r-- | java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java | 6 |
11 files changed, 208 insertions, 53 deletions
diff --git a/java/client/pom.xml b/java/client/pom.xml new file mode 100644 index 0000000000..71e8fb59e9 --- /dev/null +++ b/java/client/pom.xml @@ -0,0 +1,152 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-client</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid Client</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + </properties> + + <dependencies> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-filter-ssl</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>jmscts</groupId> + <artifactId>jmscts</artifactId> + <version>0.5-b2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>xml-security</groupId> + <artifactId>xml-security</artifactId> + <version>1.0.4</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>ant-test</id> + <phase>test</phase> + <configuration> + <tasks unless="${maven.test.skip}"> + <taskdef name="junit" + classname="org.apache.tools.ant.taskdefs.optional.junit.JUnitTask"> + <classpath> + <path refid="maven.test.classpath"/> + </classpath> + </taskdef> + <mkdir dir="${project.build.directory}/test-classes"/> + <junit fork="yes" printsummary="yes" showoutput="true" haltonfailure="yes"> + <sysproperty key="amqj.noAutoCreateVMBroker" value="true"/> + <formatter type="plain"/> + <classpath> + <path refid="maven.test.classpath"/> + </classpath> + <batchtest fork="yes" todir="${project.build.directory}"> + <fileset dir="src/test/java"> + <include name="**/test/unit/**/*Test.java"/> + <exclude name="**/test/unit/**/*UnitTests.java"/> + </fileset> + </batchtest> + </junit> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/java/client/src/log4j.properties b/java/client/src/main/java/log4j.properties index 371cfb6d61..371cfb6d61 100644 --- a/java/client/src/log4j.properties +++ b/java/client/src/main/java/log4j.properties diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 94eb1b3d7a..78d937f453 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.jms.BrokerDetails; @@ -28,7 +29,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; - +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import java.io.IOException; @@ -53,7 +54,7 @@ public class SocketTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -63,15 +64,17 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - ioConnector.setThreadModel(new ReadWriteThreadModel()); + cfg.setThreadModel(new ReadWriteThreadModel()); } - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); @@ -80,8 +83,7 @@ public class SocketTransportConnection implements ITransportConnection final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); protocolHandler.setUseSSL(brokerDetail.useSSL()); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete if (future.join(brokerDetail.getTimeout())) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 5bb975b503..ead8308143 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,6 +23,8 @@ package org.apache.qpid.client.transport; import org.apache.log4j.Logger; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; + import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -32,6 +34,7 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; + import java.io.IOException; import java.util.HashMap; import java.util.Iterator; @@ -65,7 +68,9 @@ public class TransportConnection { _acceptor = new VmPipeAcceptor(); - _acceptor.setThreadModel(new ReadWriteThreadModel()); + IoServiceConfig config = _acceptor.getDefaultConfig(); + + config.setThreadModel(new ReadWriteThreadModel()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException @@ -135,7 +140,7 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker")); + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } } @@ -158,23 +163,20 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException { int port = details.getPort(); if (!_inVmPipeAddress.containsKey(port)) { - if (noAutoCreate) + if (AutoCreate) { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); - + createVMBroker(port); } else { - _logger.info("Auto Creating VMBroker on port " + port); - createVMBroker(port); + throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); } - } return new VmPipeTransportConnection(port); @@ -195,9 +197,7 @@ public class TransportConnection provider = createBrokerInstance(port); - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); + _acceptor.bind(pipe, provider); _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); @@ -213,7 +213,7 @@ public class TransportConnection try { - _acceptor.unbind(); + _acceptor.unbind(pipe); } catch (Exception ignore) { @@ -225,10 +225,8 @@ public class TransportConnection provider = createBrokerInstance(port); } - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); - _inVmPipeAddress.put(port, _acceptor); + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -296,14 +294,14 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); + _acceptor.unbindAll(); Iterator keys = _inVmPipeAddress.keySet().iterator(); while (keys.hasNext()) { int id = (Integer) keys.next(); - - ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind(); + _inVmPipeAddress.remove(id); } } @@ -315,7 +313,7 @@ public class TransportConnection { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); - _acceptor.unbind(); + _acceptor.unbind(pipe); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index b871759428..6287d70a56 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.jms.BrokerDetails; import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; @@ -47,18 +48,18 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, "AsynchronousReadFilter"); - ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, "AsynchronousWriteFilter"); - ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting diff --git a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java index ecbf3ad230..892b349cea 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java +++ b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java @@ -159,6 +159,11 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } + public IoServiceConfig getServiceConfig() + { + return null; + } + public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -194,7 +199,7 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } - public int getScheduledWriteMessages() + public int getScheduledWriteRequests() { return 0; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/client/src/test/java/org/apache/qpid/codec/Client.java b/java/client/src/test/java/org/apache/qpid/codec/Client.java index b015c08afb..c0de5ab133 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/Client.java +++ b/java/client/src/test/java/org/apache/qpid/codec/Client.java @@ -53,11 +53,7 @@ public class Client extends IoHandlerAdapter AMQDataBlock block = BasicDeliverTest.getDataBlock(size); InetSocketAddress address = new InetSocketAddress(host, port); - - SocketConnector ioConnector = new SocketConnector(); - ioConnector.setHandler(this); - ConnectFuture future = ioConnector.connect(address); - + ConnectFuture future = new SocketConnector().connect(address, this); future.join(); _session = future.getSession(); diff --git a/java/client/src/test/java/org/apache/qpid/codec/Server.java b/java/client/src/test/java/org/apache/qpid/codec/Server.java index 2639656e41..fa4295e0b2 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/Server.java +++ b/java/client/src/test/java/org/apache/qpid/codec/Server.java @@ -34,12 +34,7 @@ public class Server extends IoHandlerAdapter { Server(int port) throws Exception { - - SocketAcceptor acceptor = new SocketAcceptor(); - - acceptor.setLocalAddress(new InetSocketAddress(port)); - acceptor.setHandler(this); - acceptor.bind(); + new SocketAcceptor().bind(new InetSocketAddress(port), this); System.out.println("Listening on " + port); } diff --git a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java index a665463736..bae3a60675 100644 --- a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java +++ b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java @@ -27,6 +27,7 @@ import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.junit.Test; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -75,18 +76,17 @@ public class AcceptorTest { IoAcceptor acceptor = null; acceptor = new SocketAcceptor(); - - SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig(); + + SocketAcceptorConfig config = (SocketAcceptorConfig) acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) config.getSessionConfig(); sc.setTcpNoDelay(true); sc.setSendBufferSize(32768); sc.setReceiveBufferSize(32768); - acceptor.setThreadModel(new ReadWriteThreadModel()); - - acceptor.setLocalAddress(new InetSocketAddress(PORT)); - acceptor.setHandler(new TestHandler()); - acceptor.bind(); + config.setThreadModel(new ReadWriteThreadModel()); + acceptor.bind(new InetSocketAddress(PORT), + new TestHandler()); _logger.info("Bound on port " + PORT); } diff --git a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java index 798cde9366..dc29861c87 100644 --- a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java +++ b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java @@ -24,6 +24,7 @@ import junit.framework.JUnit4TestAdapter; import org.apache.log4j.Logger; import org.apache.mina.common.*; import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.junit.Test; @@ -180,15 +181,16 @@ public class WriterTest implements Runnable ioConnector = new SocketConnector(); - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay(true); scfg.setSendBufferSize(32768); scfg.setReceiveBufferSize(32768); final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(new WriterHandler()); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, new WriterHandler()); // wait for connection to complete future.join(); _logger.info("Connection completed"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java index 50940aa166..e800afc7ba 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java @@ -45,6 +45,10 @@ public class TestIoSession extends BaseIoSession { return null; } + public IoServiceConfig getServiceConfig() { + return null; + } + public IoHandler getHandler() { return null; } @@ -69,7 +73,7 @@ public class TestIoSession extends BaseIoSession { return null; } - public int getScheduledWriteMessages() { + public int getScheduledWriteRequests() { return 0; } |
