summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-04-30 19:21:52 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-04-30 19:21:52 +0000
commit02e8e01f5e41a6a3f7123c73abdbe229e37af381 (patch)
tree4e03d7bb972c635167324ed891fec25a76850e5d /java
parent311ac3b34c254109bc74ffa962850fba1d5b85e5 (diff)
downloadqpid-python-02e8e01f5e41a6a3f7123c73abdbe229e37af381.tar.gz
moving the new low level client to the trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@533831 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/newclient/.project17
-rw-r--r--java/newclient/pom.xml194
-rw-r--r--java/newclient/src/main/java/client.log4j29
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java43
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java98
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java56
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java73
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java38
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java94
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java76
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java49
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java41
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java78
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java368
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java286
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java448
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java121
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java224
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java92
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java256
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java130
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java123
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java84
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java77
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java74
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java454
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java66
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java26
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java13
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java50
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java100
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml37
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java55
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java97
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java20
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java38
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java35
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java63
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java67
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java188
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java165
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java240
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java122
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java679
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java16
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java71
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java30
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java103
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java75
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java46
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java105
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java62
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java344
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java412
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java73
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java77
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java83
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java34
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java36
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java266
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java135
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java14
75 files changed, 8178 insertions, 0 deletions
diff --git a/java/newclient/.project b/java/newclient/.project
new file mode 100644
index 0000000000..4d42311982
--- /dev/null
+++ b/java/newclient/.project
@@ -0,0 +1,17 @@
+<projectDescription>
+ <name>qpid-newclient</name>
+ <comment/>
+ <projects>
+ <project>qpid-broker</project>
+ <project>qpid-common</project>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments/>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription> \ No newline at end of file
diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml
new file mode 100644
index 0000000000..5a82feb1d2
--- /dev/null
+++ b/java/newclient/pom.xml
@@ -0,0 +1,194 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-newclient</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid New Client</name>
+ <url>http://cwiki.apache.org/confluence/display/qpid</url>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ <java.source.version>1.5</java.source.version>
+ <qpid.version>${pom.version}</qpid.version>
+ <qpid.targetDir>${project.build.directory}</qpid.targetDir>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-filter-ssl</artifactId>
+ </dependency>
+
+ <!-- Test Dependencies -->
+ <dependency> <!-- for inVm Broker -->
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jmscts</groupId>
+ <artifactId>jmscts</artifactId>
+ <version>0.5-b2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymockclassextension</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>amqj.noAutoCreateVMBroker</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>amqj.logging.level</name>
+ <value>${amqj.logging.level}</value>
+ </property>
+ <property>
+ <name>log4j.configuration</name>
+ <value>file:///${basedir}/src/main/java/client.log4j</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+<!-- The inclusion of this resource causes the build to hang. -->
+ <!--resources>
+ <resource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </resource>
+ </resources-->
+
+ <testResources>
+ <testResource>
+ <targetPath>META-INF/</targetPath>
+ <filtering>false</filtering>
+ <directory>../resources/META-INF</directory>
+ <includes>
+ <include>**</include>
+ </includes>
+ </testResource>
+ <testResource>
+ <targetPath>src/</targetPath>
+ <filtering>false</filtering>
+ <directory>src/test/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </testResource>
+
+ <testResource>
+ <targetPath></targetPath>
+ <filtering>false</filtering>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>client.log4j</include>
+ </includes>
+ </testResource>
+ </testResources>
+
+ </build>
+
+</project>
diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j
new file mode 100644
index 0000000000..c2c7326479
--- /dev/null
+++ b/java/newclient/src/main/java/client.log4j
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=DEBUG
+
+
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
new file mode 100644
index 0000000000..a733fb7db7
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+public abstract class AMQPCallBack
+{
+ private boolean _isComplete = false;
+
+ public abstract void brokerResponded(AMQMethodBody body);
+
+ public abstract void brokerRespondedWithError(AMQException e);
+
+ public void setIsComplete(boolean isComplete)
+ {
+ _isComplete = isComplete;
+ }
+
+ public boolean isComplete()
+ {
+ return _isComplete;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
new file mode 100644
index 0000000000..f984e812c2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.security.SecureRandom;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public abstract class AMQPCallBackSupport
+{
+ private SecureRandom _localCorrelationIdGenerator = new SecureRandom();
+ protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>();
+
+ //the channelId assigned for this instance
+ protected int _channelId;
+
+ public AMQPCallBackSupport(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ private long getNextCorrelationId()
+ {
+ return _localCorrelationIdGenerator.nextLong();
+ }
+
+
+ // For methods that still use nowait, hopefully they will remove nowait
+ protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ if(noWait)
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+ return msg;
+ }
+ else
+ {
+ // u only need to register if u are expecting a response
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+ }
+
+ protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb)
+ {
+ long localCorrelationId = getNextCorrelationId();
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+ _cbMap.put(localCorrelationId, cb);
+ return msg;
+ }
+
+ protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException
+ {
+ if(_cbMap.containsKey(localCorrelationId))
+ {
+ AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
+ if(cb == null)
+ {
+ throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody);
+ }
+ else
+ {
+ cb.setIsComplete(true);
+ cb.brokerResponded(methodBody);
+ }
+ _cbMap.remove(localCorrelationId);
+ }
+ else
+ {
+ //ignore, as this event is for another class instance
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
new file mode 100644
index 0000000000..cbb60b130d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPChannel
+{
+
+ /**
+ * Opens the channel
+ */
+ public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException;
+
+ /**
+ * Close the channel
+ */
+ public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException;
+
+ /**
+ * Channel Flow
+ */
+ public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException;
+
+ /**
+ * Close the channel
+ */
+ public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
new file mode 100644
index 0000000000..4976daa4fa
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+public interface AMQPClassFactory
+{
+
+ public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException;
+
+ public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException;
+
+ public abstract AMQPChannel createChannelClass(int channel) throws AMQPException;
+
+ public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException;
+
+ public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException;
+
+ public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException;
+
+ public abstract AMQPQueue createQueueClass(int channel) throws AMQPException;
+
+ public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException;
+
+ public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException;
+
+ public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException;
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public abstract AMQPEventManager getEventManager();
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public abstract AMQPStateManager getStateManager();
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
new file mode 100644
index 0000000000..b18fed5605
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPConnection
+{
+
+ /**
+ * Opens the TCP connection and let the formalities begin.
+ */
+ public abstract ConnectionStartBody openTCPConnection() throws AMQPException;
+
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException;
+
+ /**
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
+ public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException;
+
+ public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException;
+
+ public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException;
+
+ public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
new file mode 100644
index 0000000000..53e11c48fb
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.DtxCoordinationCommitBody;
+import org.apache.qpid.framing.DtxCoordinationForgetBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverBody;
+import org.apache.qpid.framing.DtxCoordinationRollbackBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPDtxCoordination
+{
+ public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException;
+
+ public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException;
+
+ public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException;
+
+ public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException;
+
+ public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException;
+
+ public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException;
+
+ public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java
new file mode 100644
index 0000000000..41f1414205
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.DtxDemarcationEndBody;
+import org.apache.qpid.framing.DtxDemarcationEndOkBody;
+import org.apache.qpid.framing.DtxDemarcationSelectBody;
+import org.apache.qpid.framing.DtxDemarcationSelectOkBody;
+import org.apache.qpid.framing.DtxDemarcationStartBody;
+import org.apache.qpid.framing.DtxDemarcationStartOkBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPDtxDemarcation
+{
+ public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException;
+
+ public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException;
+
+ public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
new file mode 100644
index 0000000000..c2f93b8f42
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPExchange
+{
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+ public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
new file mode 100644
index 0000000000..61c5825fe0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMessage
+{
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+
+ public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException;
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.transfer is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException;
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.transfer is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException;
+
+ /**
+ * The correlationId from the request.
+ * For example if a message.resume is sent with correlationId "ABCD"
+ * then u need to pass that in. This correlation id is used by the execution layer
+ * to handle the correlation of method requests and responses
+ */
+ public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
new file mode 100644
index 0000000000..c10d6975c6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class also represents the AMQP Message class.
+ * You need an instance per channel.
+ * This is passed in as an argument in the constructor of an AMQPMessage instance.
+ * A client who implements this interface is notified When the broker issues
+ * Message class methods on the client.
+ *
+ * A Client should use the AMQPMessage class when it wants to issue Message class
+ * methods on the broker.
+ *
+ * A JMS MessageConsumer implementation can implement this interface and map
+ * AMQP Method notifications to the appropriate JMS methods.
+ *
+ * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance.
+ *
+ */
+
+public interface AMQPMessageCallBack
+{
+ /**
+ * -----------------------------------------------------------------------
+ * This provides Notifications for broker initiated Message class methods.
+ * All methods have a correlationId that u need to pass into
+ * the corresponding Message methods when responding to the broker.
+ *
+ * For example the correlationID passed in from Message.trasnfer
+ * should be passed back when u call Message.ok in AMQPMessage
+ * -----------------------------------------------------------------------
+ */
+
+
+ public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException;
+
+ public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException;
+
+ public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ;
+
+ public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException;
+
+ public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException;
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException;
+
+ public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
new file mode 100644
index 0000000000..ff26c6adf5
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPQueue
+{
+
+ /**
+ * -----------------------------------------------
+ * API Methods
+ * -----------------------------------------------
+ */
+ public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException;
+
+ // Queue.unbind doesn't have nowait
+ public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException;
+
+ public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException;
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
new file mode 100644
index 0000000000..6d0e83bb7e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class AbstractAMQPClassFactory
+{
+ public static AMQPClassFactory getFactoryInstance() throws AMQPException
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY);
+ try
+ {
+ return (AMQPClassFactory)Class.forName(className).newInstance();
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error creating AMQPClassFactory",e);
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
new file mode 100644
index 0000000000..f682659f9e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPEventManager
+{
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException;
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
new file mode 100644
index 0000000000..c6641890e0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
@@ -0,0 +1,78 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ *
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent<M extends AMQMethodBody>
+{
+
+ private final M _method;
+
+ private final int _channelId;
+
+ /**
+ * This is the rquest id from the broker when it sent me a request
+ * when I respond I remember this id and copy this to the outgoing
+ * response.
+ */
+ private final long _correlationId;
+
+ /**
+ * I could use _correlationId, bcos when I send a request
+ * this field is blank and is only used internally. But I
+ * used a seperate field to make it more clear.
+ */
+ private long _localCorrletionId = 0;
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ _localCorrletionId = localCorrletionId;
+ }
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public long getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getLocalCorrelationId()
+ {
+ return _localCorrletionId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request Id: ").append(_correlationId);
+ buf.append("Local Correlation Id: ").append(_localCorrletionId);
+ return buf.toString();
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
new file mode 100644
index 0000000000..e77a38121c
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMethodListener
+{
+
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
new file mode 100644
index 0000000000..decb120796
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
@@ -0,0 +1,368 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
+ * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
+ * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
+ * channel by design.
+ *
+ * A JMS Session can wrap an instance of this class.
+ */
+
+public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel
+{
+ private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class);
+
+ // the channelId assigned for this channel
+ private int _channelId;
+
+ private Phase _phase;
+
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+ private final AMQPState[] _validResumeStates = new AMQPState[]
+ { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _channelNotOpend = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
+
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+
+ private ChannelCloseOkBody _channelCloseOkBody;
+
+ private ChannelFlowOkBody _channelFlowOkBody;
+
+ private ChannelOkBody _channelOkBody;
+
+ private ChannelCloseBody _channelCloseBody;
+
+ protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
+ {
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody)
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfChannelClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody)
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody)
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfChannelClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody)
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfChannelClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.resume", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+ {
+ _channelFlowNotResponded.signal();
+ }
+ else if (_currentState == AMQPState.CHANNEL_CLOSED)
+ {
+ _channelNotResumed.signal();
+ }
+ }
+
+ private void checkIfChannelClosed() throws AMQPException
+ {
+ if (_channelCloseBody != null)
+ {
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
+ + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
+ + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)throws AMQPException
+ {
+ notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
new file mode 100644
index 0000000000..809aa57dab
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ *
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class QpidAMQPClassFactory implements AMQPClassFactory
+{
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private QpidAMQPConnection _amqpConnection;
+
+ public QpidAMQPClassFactory()
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+ */
+ public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url, type);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+ */
+ public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
+ {
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+ _amqpConnection = new QpidAMQPConnection(conn, _stateManager);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ }
+ return _amqpConnection;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int)
+ */
+ public AMQPChannel createChannelClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ return amqpChannel;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel)
+ */
+ public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int)
+ */
+ public AMQPExchange createExchangeClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ return amqpExchange;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange)
+ */
+ public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int)
+ */
+ public AMQPQueue createQueueClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ return amqpQueue;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue)
+ */
+ public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack)
+ */
+ public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+ return amqpMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage)
+ */
+ public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager()
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager()
+ */
+ public AMQPStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
new file mode 100644
index 0000000000..9b4d776cc5
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
@@ -0,0 +1,448 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
+ * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
+ * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
+ */
+public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection
+{
+ private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class);
+
+ private Phase _phase;
+
+ private TransportConnection _connection;
+
+ private long _correlationId;
+
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _connectionNotStarted = _lock.newCondition();
+
+ private final Condition _connectionNotSecure = _lock.newCondition();
+
+ private final Condition _connectionNotTuned = _lock.newCondition();
+
+ private final Condition _connectionNotOpened = _lock.newCondition();
+
+ private final Condition _connectionNotClosed = _lock.newCondition();
+
+ private ConnectionStartBody _connectionStartBody;
+
+ private ConnectionSecureBody _connectionSecureBody;
+
+ private ConnectionTuneBody _connectionTuneBody;
+
+ private ConnectionOpenOkBody _connectionOpenOkBody;
+
+ private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ private ConnectionCloseBody _connectionCloseBody;
+
+ protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection()
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
+ {
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+ notifyState(AMQPState.CONNECTION_NOT_STARTED);
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error opening connection to broker", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody)
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.startOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody)
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ _phase.messageSent(msg);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.secureOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody)
+ */
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ notifyState(AMQPState.CONNECTION_NOT_OPENED);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody)
+ */
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+ notifyState(AMQPState.CONNECTION_OPEN);
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody)
+ */
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+ notifyState(AMQPState.CONNECTION_CLOSED);
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+
+ private void handleClose() throws AMQPException
+ {
+ try
+ {
+ notifyState(AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling connection.close from broker", e);
+ }
+ }
+
+ private void checkIfConnectionClosed() throws AMQPException
+ {
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+ + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+ + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+ {
+ _connectionNotOpened.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+ {
+ _connectionNotStarted.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+ {
+ _connectionNotSecure.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+ {
+ _connectionNotTuned.signal();
+ }
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
+ }
+
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
new file mode 100644
index 0000000000..70ea6bd27d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.DtxCoordinationCommitBody;
+import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
+import org.apache.qpid.framing.DtxCoordinationForgetBody;
+import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
+import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareBody;
+import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverBody;
+import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
+import org.apache.qpid.framing.DtxCoordinationRollbackBody;
+import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPDtxCoordination;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination
+{
+ private Phase _phase;
+
+ protected QpidAMQPDtxCoordination(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof DtxCoordinationCommitOkBody ||
+ methodBody instanceof DtxCoordinationForgetOkBody ||
+ methodBody instanceof DtxCoordinationGetTimeoutOkBody ||
+ methodBody instanceof DtxCoordinationPrepareOkBody ||
+ methodBody instanceof DtxCoordinationRecoverOkBody
+ )
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
new file mode 100644
index 0000000000..fd7b6f8ec6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
@@ -0,0 +1,224 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.DtxDemarcationEndBody;
+import org.apache.qpid.framing.DtxDemarcationEndOkBody;
+import org.apache.qpid.framing.DtxDemarcationSelectBody;
+import org.apache.qpid.framing.DtxDemarcationSelectOkBody;
+import org.apache.qpid.framing.DtxDemarcationStartBody;
+import org.apache.qpid.framing.DtxDemarcationStartOkBody;
+import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
+{
+ private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class);
+
+ // the channelId that will be used for transactions
+ private int _channelId;
+
+ private Phase _phase;
+
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validEndStates = new AMQPState[]
+ { AMQPState.DTX_STARTED };
+
+ private final AMQPState[] _validStartStates = new AMQPState[]
+ { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _dtxNotSelected = _lock.newCondition();
+
+ private final Condition _dtxNotStarted = _lock.newCondition();
+
+ // maybe it needs a better name
+ private final Condition _dtxNotEnd = _lock.newCondition();
+
+ private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
+
+ private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody;
+
+ private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody;
+
+ protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
+ {
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+ public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationSelectOkBody = null;
+ checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotSelected.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
+ notifyState(AMQPState.DTX_NOT_STARTED);
+ _currentState = AMQPState.DTX_NOT_STARTED;
+ return _dtxDemarcationSelectOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.select", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationStartOkBody = null;
+ checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotStarted.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time");
+ notifyState(AMQPState.DTX_STARTED);
+ _currentState = AMQPState.DTX_STARTED;
+ return _dtxDemarcationStartOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.start", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _dtxDemarcationEndOkBody = null;
+ checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _dtxNotEnd.await();
+ AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time");
+ notifyState(AMQPState.DTX_END);
+ _currentState = AMQPState.DTX_END;
+ return _dtxDemarcationEndOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in dtx.start", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof DtxDemarcationSelectOkBody)
+ {
+ _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+ _dtxNotSelected.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof DtxDemarcationStartOkBody)
+ {
+ _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod();
+ _dtxNotStarted.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof DtxDemarcationEndOkBody)
+ {
+ _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
+ _dtxNotEnd.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
new file mode 100644
index 0000000000..02b22e0755
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ *
+ * This class represents the Exchange class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange
+{
+ private Phase _phase;
+
+ protected QpidAMQPExchange(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
new file mode 100644
index 0000000000..e40c3cefa2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ * This class represents the AMQP Message class.
+ * You need an instance of this class per channel.
+ * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
+ * A client can use this class to issue Message class methods on the broker.
+ * When the broker issues Message class methods on the client, the client is notified
+ * via the AMQPMessageCallBack interface.
+ *
+ * A JMS Message producer implementation can wrap an instance if this and map
+ * JMS method calls to the appropriate AMQP methods.
+ *
+ * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
+ *
+ */
+public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage
+{
+ private Phase _phase;
+ private AMQPMessageCallBack _messageCb;
+
+ protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+ {
+ super(channelId);
+ _phase = phase;
+ _messageCb = messageCb;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+
+ public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long)
+ */
+ public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long)
+ */
+ public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long)
+ */
+ public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException
+ {
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
+ _phase.messageSent(msg);
+ }
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof MessageOkBody ||
+ methodBody instanceof MessageRejectBody ||
+ methodBody instanceof MessageEmptyBody)
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else if (methodBody instanceof MessageTransferBody)
+ {
+ _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageAppendBody)
+ {
+ _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageOpenBody)
+ {
+ _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCloseBody)
+ {
+ _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageCheckpointBody)
+ {
+ _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageRecoverBody)
+ {
+ _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else if (methodBody instanceof MessageResumeBody)
+ {
+ _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
new file mode 100644
index 0000000000..323ff0cf06
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ *
+ * This class represents the Queue class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to a particular method.
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue
+{
+ private Phase _phase;
+
+ protected QpidAMQPQueue(int channelId,Phase phase)
+ {
+ super(channelId);
+ _phase = phase;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ // Queue.unbind doesn't have nowait
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
+ _phase.messageSent(msg);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+ */
+ public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
+ {
+ AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
+ _phase.messageSent(msg);
+ }
+
+
+ /**-------------------------------------------
+ * AMQPMethodListener methods
+ *--------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ long localCorrelationId = evt.getLocalCorrelationId();
+ AMQMethodBody methodBody = evt.getMethod();
+ if ( methodBody instanceof QueueDeclareOkBody ||
+ methodBody instanceof QueueBindOkBody ||
+ methodBody instanceof QueueUnbindOkBody ||
+ methodBody instanceof QueuePurgeOkBody ||
+ methodBody instanceof QueueDeleteOkBody
+ )
+ {
+ invokeCallBack(localCorrelationId,methodBody);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
new file mode 100644
index 0000000000..902c80fe77
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener,
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ *
+ * <p/>
+ */
+public class QpidEventManager implements AMQPEventManager
+{
+ private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
+
+ private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
+
+ /**
+ * ------------------------------------------------
+ * methods introduced by AMQEventManager
+ * ------------------------------------------------
+ */
+ public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ Map<Class, List> _methodListenerMap;
+ if (_channelMap.containsKey(channelId))
+ {
+ _methodListenerMap = _channelMap.get(channelId);
+
+ }
+ else
+ {
+ _methodListenerMap = new ConcurrentHashMap<Class, List>();
+ _channelMap.put(channelId, _methodListenerMap);
+ }
+
+ List<AMQPMethodListener> _listeners;
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ _listeners = _methodListenerMap.get(clazz);
+ }
+ else
+ {
+ _listeners = new ArrayList<AMQPMethodListener>();
+ _methodListenerMap.put(clazz, _listeners);
+ }
+
+ _listeners.add(l);
+
+ }
+
+ public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+ {
+ if (_channelMap.containsKey(channelId))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
+
+ if (_methodListenerMap.containsKey(clazz))
+ {
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+ _listeners.remove(l);
+ }
+
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+ */
+ public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ if (_channelMap.containsKey(evt.getChannelId()))
+ {
+ Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+ if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+ {
+
+ List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+ for (AMQPMethodListener l : _listeners)
+ {
+ l.methodReceived(evt);
+ }
+
+ return (_listeners.size() > 0);
+ }
+
+ }
+
+ return false;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
new file mode 100644
index 0000000000..3fe1ee4cfd
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class QpidStateManager implements AMQPStateManager
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
+
+ private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
+
+ public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ {
+ List<AMQPStateListener> list;
+ if(_listernerMap.containsKey(stateType))
+ {
+ list = _listernerMap.get(stateType);
+ }
+ else
+ {
+ list = new ArrayList<AMQPStateListener>();
+ _listernerMap.put(stateType, list);
+ }
+ list.add(l);
+ }
+
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ {
+ if(_listernerMap.containsKey(stateType))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(stateType);
+ list.remove(l);
+ }
+ }
+
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+
+ if(_listernerMap.containsKey(event.getStateType()))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
+ for(AMQPStateListener l: list)
+ {
+ l.stateChanged(event);
+ }
+ }
+ else
+ {
+ _logger.warn("There are no registered listerners for state type" + event.getStateType());
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
new file mode 100644
index 0000000000..31a0197ab1
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class MessageHelper implements AMQPMessageCallBack
+{
+
+ public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+ {
+ System.out.println("The Broker has sent a message" + messageTransferBody.toString());
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
new file mode 100644
index 0000000000..908f0adee0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.sample;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
+import org.apache.qpid.nclient.security.CallbackHandlerRegistry;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public class SecurityHelper
+{
+ public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
+ {
+ final String mechanisms = new String(availableMechanisms, "utf8");
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ HashSet mechanismSet = new HashSet();
+ while (tokenizer.hasMoreTokens())
+ {
+ mechanismSet.add(tokenizer.nextToken());
+ }
+
+ String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
+ StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
+ while (prefTokenizer.hasMoreTokens())
+ {
+ String mech = prefTokenizer.nextToken();
+ if (mechanismSet.contains(mech))
+ {
+ return mech;
+ }
+ }
+ return null;
+ }
+
+ public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url)
+ throws AMQPException
+ {
+ Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
+ try
+ {
+ Object instance = mechanismClass.newInstance();
+ AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance;
+ cbh.initialise(url);
+ return cbh;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Unable to create callback handler: " + e, e);
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
new file mode 100644
index 0000000000..04714d7278
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class StateHelper implements AMQPStateListener
+{
+
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+ String s = event.getStateType() + " changed state from " +
+ event.getOldState() + " to " + event.getNewState();
+
+ System.out.println(s);
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
new file mode 100644
index 0000000000..3dce1cde1e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
@@ -0,0 +1,454 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.sample;
+
+import java.util.StringTokenizer;
+import java.util.UUID;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+
+/**
+ * This class illustrates the usage of the API
+ * Notes this is just a simple demo.
+ *
+ * I have used Helper classes to keep the code cleaner.
+ * Will break this into unit tests later on
+ */
+
+@SuppressWarnings("unused")
+public class TestClient
+{
+ private byte _major;
+
+ private byte _minor;
+
+ private ConnectionURL _url;
+
+ private static int _channel = 2;
+
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory;
+
+ private int _ticket;
+
+ public AMQPConnection openConnection() throws Exception
+ {
+ _classFactory = AbstractAMQPClassFactory.getFactoryInstance();
+
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+ _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ return _classFactory.createConnectionClass(_url, ConnectionType.TCP);
+ }
+
+ public void handleConnectionNegotiation(AMQPConnection con) throws Exception
+ {
+ StateHelper stateHelper = new StateHelper();
+ _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper);
+ _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper);
+
+ //ConnectionStartBody
+ ConnectionStartBody connectionStartBody = con.openTCPConnection();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
+
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+ clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
+
+ final String locales = new String(connectionStartBody.getLocales(), "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+ final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+ SaslClient sc = Sasl.createSaslClient(new String[]
+ { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
+
+ ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
+ tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+ // ConnectionSecureBody
+ AMQMethodBody body = con.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
+
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
+ .evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody) body;
+ }
+
+ // Using broker supplied values
+ ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
+ connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
+ con.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
+ .getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+ }
+
+ public void handleChannelNegotiation() throws Exception
+ {
+ AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+ //lets have some fun
+ ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+ ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+
+ channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+ channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+ }
+
+ public void createExchange() throws Exception
+ {
+ AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+ ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments
+ false,//auto delete
+ false,// durable
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal
+ false,// nowait
+ false,// passive
+ _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+ exchange.declare(exchangeDeclareBody, cb);
+ // Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void createAndBindQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments
+ false,//auto delete
+ false,// durable
+ false, //exclusive,
+ false, //nowait,
+ false, //passive,
+ new AMQShortString("MyTestQueue"), 0);
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body;
+ System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count "
+ + queueDeclareOkBody.getConsumerCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.declare(queueDeclareBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ new AMQShortString("RH"), //routingKey
+ 0 //ticket
+ );
+
+ cb = createCallBackWithMessage("Broker has bound the queue");
+ queue.bind(queueBindBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void purgeQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body;
+ System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.purge(queuePurgeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ }
+
+ public void deleteQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty
+ false, //ifUnused
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body;
+ System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.delete(queueDeleteBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ }
+
+ public void publishAndSubscribe() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination
+ false, //exclusive
+ null, //filter
+ false, //noAck,
+ false, //noLocal,
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+ message.consume(messageConsumeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ // Sending 5 messages serially
+ for (int i = 0; i < 5; i++)
+ {
+ cb = createCallBackWithMessage("Broker has accepted msg " + i);
+ message.transfer(createMessages("Test" + i), cb);
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+ AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+ message.cancel(messageCancelBody, cb2);
+
+ }
+
+ private MessageTransferBody createMessages(String content) throws Exception
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+ MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId
+ headers, //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body
+ new AMQShortString(""), //contentEncoding,
+ new AMQShortString("text/plain"), //contentType
+ new AMQShortString("testApp"), //correlationId
+ (short) 1, //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ 0l, //expiration
+ false, //immediate
+ false, //mandatory
+ new AMQShortString(UUID.randomUUID().toString()), //messageId
+ (short) 0, //priority
+ false, //redelivered
+ new AMQShortString("RH"), //replyTo
+ new AMQShortString("RH"), //routingKey,
+ "abc".getBytes(), //securityToken
+ 0, //ticket
+ System.currentTimeMillis(), //timestamp
+ new AMQShortString(""), //transactionId
+ 0l, //ttl,
+ new AMQShortString("Hello") //userId
+ );
+
+ return messageTransferBody;
+
+ }
+
+ public void publishAndGet() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+ MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+ message.transfer(createMessages("Test"), cb);
+ while (!cb.isComplete())
+ {
+ }
+
+ cb = createCallBackWithMessage("Broker has accepted get");
+ message.get(messageGetBody, cb);
+ }
+
+ // Creates a gneric call back and prints the given message
+ private AMQPCallBack createCallBackWithMessage(final String msg)
+ {
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ System.out.println(msg);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ return cb;
+ }
+
+ public static void main(String[] args)
+ {
+ TestClient test = new TestClient();
+ try
+ {
+ AMQPConnection con = test.openConnection();
+ test.handleConnectionNegotiation(con);
+ test.handleChannelNegotiation();
+ test.createExchange();
+ test.createAndBindQueue();
+ test.publishAndSubscribe();
+ test.purgeQueue();
+ test.publishAndGet();
+ test.deleteQueue();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
new file mode 100644
index 0000000000..18219abc46
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+/**
+ * States used in the AMQ protocol. Used by the finite state machine to determine
+ * valid responses.
+ */
+public class AMQPState
+{
+ private final int _id;
+
+ private final String _name;
+
+ private AMQPState(int id, String name)
+ {
+ _id = id;
+ _name = name;
+ }
+
+ public String toString()
+ {
+ //return "AMQState: id = " + _id + " name: " + _name;
+ return _name; // looks better with loggin
+ }
+
+ // Connection state
+ public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED");
+ public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED");
+ public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE");
+ public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED");
+ public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED");
+ public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN");
+ public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING");
+ public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED");
+
+ // Channel state
+ public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED");
+ public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");
+ public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
+ public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
+
+ // Distributed Transaction state
+ public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED");
+ public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED");
+ public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED");
+ public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
new file mode 100644
index 0000000000..506d267a9e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+public class AMQPStateChangedEvent
+{
+ private AMQPState _oldState;
+
+ private AMQPState _newState;
+
+ private AMQPStateType _stateType;
+
+ public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ _stateType = stateType;
+ }
+
+ public AMQPState getNewState()
+ {
+ return _newState;
+ }
+
+ public void setNewState(AMQPState newState)
+ {
+ this._newState = newState;
+ }
+
+ public AMQPState getOldState()
+ {
+ return _oldState;
+ }
+
+ public void setOldState(AMQPState oldState)
+ {
+ this._oldState = oldState;
+ }
+
+ public AMQPStateType getStateType()
+ {
+ return _stateType;
+ }
+
+ public void setStateType(AMQPStateType stateType)
+ {
+ this._stateType = stateType;
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
new file mode 100644
index 0000000000..974f707504
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
@@ -0,0 +1,8 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPStateListener
+{
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
new file mode 100644
index 0000000000..c1fde7181d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
@@ -0,0 +1,26 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPStateMachine
+{
+ protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
+ {
+ if (currentState != correctState)
+ {
+ throw new IllegalStateTransitionException(currentState,requiredState);
+ }
+ }
+
+ protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
+ {
+ for(AMQPState correctState :correctStates)
+ {
+ if (currentState == correctState)
+ {
+ return;
+ }
+ }
+ throw new IllegalStateTransitionException(currentState,requiredState);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
new file mode 100644
index 0000000000..9bc60b658e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
@@ -0,0 +1,13 @@
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPStateManager
+{
+ public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
new file mode 100644
index 0000000000..e190d639f2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+/**
+ * The Type of States used in the AMQ protocol.
+ * This allows to partition listeners by the type of states they want
+ * to listen rather than all.
+ * For example an Object might only be interested in Channel state
+ */
+public class AMQPStateType
+{
+ private final int _typeId;
+
+ private final String _typeName;
+
+ private AMQPStateType(int id, String name)
+ {
+ _typeId = id;
+ _typeName = name;
+ }
+
+ public String toString()
+ {
+ return _typeName;
+ }
+
+ // Connection state
+ public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE");
+ public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE");
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
new file mode 100644
index 0000000000..fdc24d1d2f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class IllegalStateTransitionException extends AMQPException
+{
+ private AMQPState _currentState;
+ private AMQPState _desiredState;
+
+ public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState)
+ {
+ super("No valid state transition defined from state " + currentState +
+ " to state " + desiredState);
+ _currentState = currentState;
+ _desiredState = desiredState;
+ }
+
+ public AMQPState getCurrentState()
+ {
+ return _currentState;
+ }
+
+ public AMQPState getDesiredState()
+ {
+ return _desiredState;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
new file mode 100644
index 0000000000..7bc77e02c0
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
@@ -0,0 +1,100 @@
+package org.apache.qpid.nclient.config;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
+
+/**
+ * Loads a properties file from classpath.
+ * These values can be overwritten using system properties
+ */
+public class ClientConfiguration extends CombinedConfiguration {
+
+ private static final Logger _logger = Logger.getLogger(ClientConfiguration.class);
+ private static ClientConfiguration _instance = new ClientConfiguration();
+
+ ClientConfiguration()
+ {
+ super();
+ addConfiguration(new SystemConfiguration());
+ try
+ {
+ XMLConfiguration config = new XMLConfiguration();
+ config.load(getInputStream());
+ addConfiguration(config);
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.warn("Client Properties missing, using defaults",e);
+ }
+ }
+
+ public static ClientConfiguration get()
+ {
+ return _instance;
+ }
+
+ private InputStream getInputStream()
+ {
+ if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null)
+ {
+ try
+ {
+ return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH));
+ }
+ catch(Exception e)
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+ }
+ else
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
+
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
+ {
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
new file mode 100644
index 0000000000..6eeedbe1ff
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+<qpidClientConfig>
+
+ <security>
+ <saslClientFactoryTypes>
+ <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+ </saslClientFactoryTypes>
+ <securityMechanisms>
+ <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ </securityMechanisms>
+ </security>
+
+ <!-- Transport Layer properties -->
+ <useSharedReadWritePool>false</useSharedReadWritePool>
+ <enableDirectBuffers>true</enableDirectBuffers>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <tcpNoDelay>true</tcpNoDelay>
+ <sendBufferSizeInKb>32</sendBufferSizeInKb>
+ <reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+ <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+
+ <!-- Execution Layer properties -->
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <!-- Model Phase properties -->
+ <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds>
+ <amqpClassFactory>org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory</amqpClassFactory>
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <phasePipe>
+ <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase>
+ <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase>
+ <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase>
+ </phasePipe>
+
+</qpidClientConfig> \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
new file mode 100644
index 0000000000..a029f7d4ff
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
@@ -0,0 +1,55 @@
+package org.apache.qpid.nclient.core;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+
+public class AMQPException extends Exception
+{
+ private int _errorCode;
+
+ public AMQPException(String message)
+ {
+ super(message);
+ }
+
+ public AMQPException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public AMQPException(int errorCode, String msg, Throwable t)
+ {
+ super(msg + " [error code " + errorCode + ']', t);
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(int errorCode, String msg)
+ {
+ super(msg + " [error code " + errorCode + ']');
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(Logger logger, String msg, Throwable t)
+ {
+ this(msg, t);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, String msg)
+ {
+ this(msg);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, int errorCode, String msg)
+ {
+ this(errorCode, msg);
+ logger.error(getMessage(), this);
+ }
+
+ public int getErrorCode()
+ {
+ return _errorCode;
+ }
+ }
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
new file mode 100644
index 0000000000..bf6a19b920
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+public abstract class AbstractPhase implements Phase {
+
+ protected PhaseContext _ctx;
+ protected Phase _nextInFlowPhase;
+ protected Phase _nextOutFlowPhase;
+
+
+ /**
+ * ------------------------------------------------
+ * Phase - method introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) {
+ _nextInFlowPhase = nextInFlowPhase;
+ _nextOutFlowPhase = nextOutFlowPhase;
+ _ctx = ctx;
+ }
+
+ /**
+ * The start is called from the top
+ * of the pipe and is propogated the
+ * bottom.
+ *
+ * Each phase can override this to do
+ * any phase specific logic related
+ * pipe.start()
+ */
+ public void start()throws AMQPException
+ {
+ if(_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.start();
+ }
+ }
+
+ /**
+ * Each phase can override this to do
+ * any phase specific cleanup
+ */
+ public void close()throws AMQPException
+ {
+
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ if(_nextInFlowPhase != null)
+ {
+ _nextInFlowPhase.messageReceived(frame);
+ }
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+ if (_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.messageSent(frame);
+ }
+ }
+
+ public PhaseContext getPhaseContext()
+ {
+ return _ctx;
+ }
+
+ public Phase getNextInFlowPhase() {
+ return _nextInFlowPhase;
+ }
+
+ public Phase getNextOutFlowPhase() {
+ return _nextOutFlowPhase;
+ }
+
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
new file mode 100644
index 0000000000..a3455cdacd
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
@@ -0,0 +1,20 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultPhaseContext implements PhaseContext
+{
+ public Map<String,Object> _props = new ConcurrentHashMap<String,Object>();
+
+ public Object getProperty(String name)
+ {
+ return _props.get(name);
+ }
+
+ public void setProperty(String name, Object value)
+ {
+ _props.put(name, value);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
new file mode 100644
index 0000000000..7aa10b77ff
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
@@ -0,0 +1,38 @@
+package org.apache.qpid.nclient.core;
+
+
+public interface Phase
+{
+
+ /**
+ * This method is used to initialize a phase
+ *
+ * @param ctx
+ * @param nextInFlowPhase
+ * @param nextOutFlowPhase
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase);
+
+ /**
+ *
+ * Implement logic related to physical opening
+ * of the pipe
+ */
+ public void start()throws AMQPException;
+
+ /**
+ * Implement cleanup in this method.
+ * This indicates the pipe is closing
+ */
+ public void close()throws AMQPException;
+
+ public void messageReceived(Object msg) throws AMQPException;
+
+ public void messageSent(Object msg) throws AMQPException;
+
+ public PhaseContext getPhaseContext();
+
+ public Phase getNextOutFlowPhase();
+
+ public Phase getNextInFlowPhase();
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
new file mode 100644
index 0000000000..d5942fd785
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+/**
+ * This can be thought of as a session context associated
+ * with the pipe. This is transient and is scoped by the
+ * duration of the physical connection.
+ *
+ */
+public interface PhaseContext {
+
+ public Object getProperty(String name);
+
+ public void setProperty(String name, Object value);
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
new file mode 100644
index 0000000000..9542aab344
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
@@ -0,0 +1,63 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.nclient.config.ClientConfiguration;
+
+public class PhaseFactory
+{
+ /**
+ * This method will create the pipe and return a reference
+ * to the top of the pipeline.
+ *
+ * The application can then use this (top most) phase and all
+ * calls will propogated down the pipe.
+ *
+ * Simillar calls orginating at the bottom of the pipeline
+ * will be propogated to the top.
+ *
+ * @param ctx
+ * @return
+ * @throws AMQPException
+ */
+ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
+ {
+ String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
+ Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
+ List<String> list = ClientConfiguration.get().getList(key);
+ int index = 0;
+ for(String s:list)
+ {
+ try
+ {
+ Phase temp = (Phase)Class.forName(s).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
+ }
+ index++;
+ }
+
+ Phase current = null;
+ Phase prev = null;
+ Phase next = null;
+ //Lets build the phase pipe.
+ for (int i=0; i<phaseMap.size();i++)
+ {
+ current = phaseMap.get(i);
+ if (i+1 < phaseMap.size())
+ {
+ next = phaseMap.get(i+1);
+ }
+ current.init(ctx, next, prev);
+ prev = current;
+ next = null;
+ }
+
+ return current;
+ }
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
new file mode 100644
index 0000000000..034fc28070
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
@@ -0,0 +1,67 @@
+package org.apache.qpid.nclient.core;
+
+public interface QpidConstants
+{
+
+ // Common properties
+ public static long EMPTY_CORRELATION_ID = -1;
+
+ public static int CHANNEL_ZERO = 0;
+
+ public static String CONFIG_FILE_PATH = "ConfigFilePath";
+
+ // Phase Context properties
+ public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
+
+ public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
+
+ public final static String EVENT_MANAGER = "EVENT_MANAGER";
+
+ /**---------------------------------------------------------------
+ * Configuration file properties
+ * ------------------------------------------------------------
+ */
+
+ // Model Layer properties
+ public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
+ public final static String AMQP_CLASS_FACTORY = "amqpClassFactory";
+
+ // MINA properties
+ public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool";
+
+ public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers";
+
+ public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator";
+
+ public final static String TCP_NO_DELAY = "tcpNoDelay";
+
+ public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb";
+
+ public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb";
+
+ // Security properties
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes";
+
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
+
+ public final static String TYPE = "[@type]";
+
+ public final static String AMQP_SECURITY = "security";
+
+ public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
+
+ public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
+
+ // Execution Layer properties
+ public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses";
+
+ //Transport Layer properties
+ public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass";
+
+ //Phase pipe properties
+ public final static String PHASE_PIPE = "phasePipe";
+
+ public final static String PHASE = "phase";
+
+ public final static String INDEX = "[@index]";
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
new file mode 100644
index 0000000000..cc1503d414
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TransientPhaseContext implements PhaseContext {
+
+ private Map map = new ConcurrentHashMap();
+
+ public Object getProperty(String name) {
+ return map.get(name);
+ }
+
+ public void setProperty(String name, Object value) {
+ map.put(name, value);
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
new file mode 100644
index 0000000000..1305500439
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
@@ -0,0 +1,188 @@
+package org.apache.qpid.nclient.execution;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * Corressponds to the Layer 2 in AMQP.
+ * This phase handles the correlation of amqp messages
+ * This class implements the 0.9 spec (request/response)
+ */
+public class ExecutionPhase extends AbstractPhase
+{
+
+ protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
+
+ protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+
+ protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
+
+ /**
+ * --------------------------------------------------
+ * Phase related methods
+ * --------------------------------------------------
+ */
+
+ // should add these in the init method
+ //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ AMQFrame frame = (AMQFrame) msg;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof AMQRequestBody)
+ {
+ AMQPMethodEvent event;
+ try
+ {
+ event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
+ super.messageReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling request", e);
+ }
+
+ }
+ else if (bodyFrame instanceof AMQResponseBody)
+ {
+ List<AMQPMethodEvent> events;
+ try
+ {
+ events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
+ for (AMQPMethodEvent event : events)
+ {
+ super.messageReceived(event);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling response", e);
+ }
+ }
+ }
+
+ /**
+ * Need to figure out if the message is a request or a response
+ * that needs to be sent and then delegate it to the Request or response manager
+ * to prepare it.
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ AMQPMethodEvent evt = (AMQPMethodEvent) msg;
+ if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ {
+ // This is a request
+ AMQFrame frame = handleRequest(evt);
+ super.messageSent(frame);
+ }
+ else
+ {
+ // This is a response
+ List<AMQFrame> frames = handleResponse(evt);
+ for (AMQFrame frame : frames)
+ {
+ super.messageSent(frame);
+ }
+ }
+ }
+
+ /**
+ * ------------------------------------------------
+ * Methods to handle request response
+ * -----------------------------------------------
+ */
+ private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ return responseManager.requestReceived(requestBody);
+ }
+
+ private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
+ throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Response frame received: " + responseBody);
+ }
+
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+
+ return requestManager.responseReceived(responseBody);
+ }
+
+ private AMQFrame handleRequest(AMQPMethodEvent evt)
+ {
+ int channelId = evt.getChannelId();
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+ return requestManager.sendRequest(evt);
+ }
+
+ private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
+ {
+ int channelId = evt.getChannelId();
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ try
+ {
+ return responseManager.sendResponse(evt);
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling response", e);
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
new file mode 100644
index 0000000000..0084f27717
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
@@ -0,0 +1,165 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+
+public class RequestManager
+{
+ private static final Logger logger = Logger.getLogger(RequestManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long requestIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastProcessedResponseId;
+
+ private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
+
+ public RequestManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ requestIdCount = 1L;
+ lastProcessedResponseId = 0L;
+ requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
+ }
+
+ // *** Functions to originate a request ***
+
+ public AMQFrame sendRequest(AMQPMethodEvent evt)
+ {
+ long requestId = getNextRequestId(); // Get new request ID
+ AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
+ lastProcessedResponseId, evt.getMethod());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
+ }
+ requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
+ return requestFrame;
+ }
+
+ public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody)
+ throws Exception
+ {
+ long requestIdStart = responseBody.getRequestId();
+ long requestIdStop = requestIdStart + responseBody.getBatchOffset();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ responseBody + "; " + responseBody.getMethodPayload());
+ }
+
+ List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>();
+ for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
+ {
+ if (requestSentMap.get(requestId) == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in requestSentMap.");
+ }
+ CorrelationID correlationID = requestSentMap.get(requestId);
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
+ correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
+ events.add(methodEvent);
+ requestSentMap.remove(requestId);
+ }
+ lastProcessedResponseId = responseBody.getResponseId();
+ return events;
+ }
+
+ // *** Management functions ***
+
+ public int requestsMapSize()
+ {
+ return requestSentMap.size();
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextRequestId()
+ {
+ return requestIdCount++;
+ }
+
+ private class CorrelationID
+ {
+ // Use for the request/response stuff
+ private long _systemCorrelationID;
+ // used internally to track callbacks
+ private long _localCorrelationID;
+
+ CorrelationID(long systemCorrelationID,long localCorrelationID)
+ {
+ _localCorrelationID = localCorrelationID;
+ _systemCorrelationID = systemCorrelationID;
+ }
+
+ public long getLocalCorrelationID()
+ {
+ return _localCorrelationID;
+ }
+
+ public void setLocalCorrelationID(long correlationID)
+ {
+ _localCorrelationID = correlationID;
+ }
+
+ public long getSystemCorrelationID()
+ {
+ return _systemCorrelationID;
+ }
+
+ public void setSystemCorrelationID(long correlationID)
+ {
+ _systemCorrelationID = correlationID;
+ }
+
+
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
new file mode 100644
index 0000000000..c5a75d242f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class ResponseManager
+{
+ private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ private int maxAccumulatedResponses = 20; // Default
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long responseIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastReceivedRequestId;
+
+ /**
+ * Last requestID sent in a response (for batching)
+ */
+ private long lastSentRequestId;
+
+ private class ResponseStatus implements Comparable<ResponseStatus>
+ {
+ private long requestId;
+ private AMQMethodBody responseMethodBody;
+
+ public ResponseStatus(long requestId)
+ {
+ this.requestId = requestId;
+ responseMethodBody = null;
+ }
+
+ public int compareTo(ResponseStatus o)
+ {
+ return (int)(requestId - o.requestId);
+ }
+
+ public String toString()
+ {
+ // Need to define this
+ return "";
+ }
+ }
+
+ private ConcurrentHashMap<Long, ResponseStatus> responseMap;
+
+ public ResponseManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ responseIdCount = 1L;
+ lastReceivedRequestId = 0L;
+ maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES);
+ responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
+ }
+
+ // *** Functions to handle an incoming request ***
+
+ public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception
+ {
+ long requestId = requestBody.getRequestId();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ requestBody + "; " + requestBody.getMethodPayload());
+ }
+ long responseMark = requestBody.getResponseMark();
+ lastReceivedRequestId = requestId;
+ responseMap.put(requestId, new ResponseStatus(requestId));
+
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel,
+ requestBody.getMethodPayload(), requestId);
+
+ return methodEvent;
+ }
+
+ public List<AMQFrame> sendResponse(AMQPMethodEvent evt)
+ throws RequestResponseMappingException
+ {
+ long requestId = evt.getCorrelationId();
+ AMQMethodBody responseMethodBody = evt.getMethod();
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
+ }
+
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in responseMap." + responseMap);
+ }
+ if (responseStatus.responseMethodBody != null)
+ {
+ throw new RequestResponseMappingException(requestId, "RequestId " +
+ requestId + " already has a response in responseMap.");
+ }
+ responseStatus.responseMethodBody = responseMethodBody;
+ return doBatches();
+ }
+
+ // *** Management functions ***
+
+ /**
+ * Sends batched responses - i.e. all those members of responseMap that have
+ * received a response.
+ */
+ public synchronized List<AMQFrame> doBatches()
+ {
+ long startRequestId = 0;
+ int numAdditionalRequestIds = 0;
+ Class responseMethodBodyClass = null;
+ List<AMQFrame> frames = new ArrayList<AMQFrame>();
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
+ {
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
+ {
+ frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody));
+ lItr.remove();
+ }
+ }
+
+ return frames;
+ }
+
+ /**
+ * Total number of entries in the responseMap - including both those that
+ * are outstanding (i.e. no response has been received) and those that are
+ * batched (those for which responses have been received but have not yet
+ * been collected together and sent).
+ */
+ public int responsesMapSize()
+ {
+ return responseMap.size();
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody null.
+ */
+ public int outstandingResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody == null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody not null.
+ */
+ public int batchedResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody != null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextResponseId()
+ {
+ return responseIdCount++;
+ }
+
+ private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
+ AMQMethodBody responseMethodBody)
+ {
+ long responseId = getNextResponseId(); // Get new response ID
+ AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
+ firstRequestId, numAdditionalRequests, responseMethodBody);
+ return responseFrame;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
new file mode 100644
index 0000000000..d79525c5b2
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.message;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class AMQPApplicationMessage {
+
+ private int bytesReceived = 0;
+ private int channelId;
+ private byte[] referenceId;
+ private List<byte[]> contents = new LinkedList<byte[]>();
+ private long deliveryTag;
+ private boolean redeliveredFlag;
+ private MessageHeaders messageHeaders;
+
+ public AMQPApplicationMessage(int channelId, byte[] referenceId)
+ {
+ this.channelId = channelId;
+ this.referenceId = referenceId;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ addContent(content);
+ }
+
+ public void addContent(byte[] content)
+ {
+ contents.add(content);
+ bytesReceived += content.length;
+ }
+
+ public int getBytesReceived()
+ {
+ return bytesReceived;
+ }
+
+ public int getChannelId()
+ {
+ return channelId;
+ }
+
+ public byte[] getReferenceId()
+ {
+ return referenceId;
+ }
+
+ public List<byte[]> getContents()
+ {
+ return contents;
+ }
+
+ public long getDeliveryTag()
+ {
+ return deliveryTag;
+ }
+
+ public boolean getRedeliveredFlag()
+ {
+ return redeliveredFlag;
+ }
+
+ public MessageHeaders getMessageHeaders()
+ {
+ return messageHeaders;
+ }
+
+ public String toString()
+ {
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" +
+ deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" +
+ new String(contents.get(0));
+ }
+
+ public void setDeliveryTag(long deliveryTag)
+ {
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void setMessageHeaders(MessageHeaders messageHeaders)
+ {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public void setRedeliveredFlag(boolean redeliveredFlag)
+ {
+ this.redeliveredFlag = redeliveredFlag;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
new file mode 100644
index 0000000000..562aa7b06e
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
@@ -0,0 +1,679 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.qpid.nclient.message;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.Enumeration;
+
+public class MessageHeaders
+{
+ private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
+
+ private AMQShortString _contentType;
+
+ private AMQShortString _encoding;
+
+ private AMQShortString _destination;
+
+ private AMQShortString _exchange;
+
+ private FieldTable _jmsHeaders;
+
+ private short _deliveryMode;
+
+ private short _priority;
+
+ private AMQShortString _correlationId;
+
+ private AMQShortString _replyTo;
+
+ private long _expiration;
+
+ private AMQShortString _messageId;
+
+ private long _timestamp;
+
+ private AMQShortString _type;
+
+ private AMQShortString _userId;
+
+ private AMQShortString _appId;
+
+ private AMQShortString _transactionId;
+
+ private AMQShortString _routingKey;
+
+ private int _size;
+
+ public int getSize()
+ {
+ return _size;
+ }
+
+ public void setSize(int size)
+ {
+ this._size = size;
+ }
+
+ public MessageHeaders()
+ {
+ }
+
+ public AMQShortString getContentType()
+ {
+ return _contentType;
+ }
+
+ public void setContentType(AMQShortString contentType)
+ {
+ _contentType = contentType;
+ }
+
+ public AMQShortString getEncoding()
+ {
+ return _encoding;
+ }
+
+ public void setEncoding(AMQShortString encoding)
+ {
+ _encoding = encoding;
+ }
+
+ public FieldTable getJMSHeaders()
+ {
+ if (_jmsHeaders == null)
+ {
+ setJMSHeaders(FieldTableFactory.newFieldTable());
+ }
+
+ return _jmsHeaders;
+ }
+
+ public void setJMSHeaders(FieldTable headers)
+ {
+ _jmsHeaders = headers;
+ }
+
+
+ public short getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(short deliveryMode)
+ {
+ _deliveryMode = deliveryMode;
+ }
+
+ public short getPriority()
+ {
+ return _priority;
+ }
+
+ public void setPriority(short priority)
+ {
+ _priority = priority;
+ }
+
+ public AMQShortString getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
+ _correlationId = correlationId;
+ }
+
+ public AMQShortString getReplyTo()
+ {
+ return _replyTo;
+ }
+
+ public void setReplyTo(AMQShortString replyTo)
+ {
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
+
+ public AMQShortString getMessageId()
+ {
+ return _messageId;
+ }
+
+ public void setMessageId(AMQShortString messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ _timestamp = timestamp;
+ }
+
+ public AMQShortString getType()
+ {
+ return _type;
+ }
+
+ public void setType(AMQShortString type)
+ {
+ _type = type;
+ }
+
+ public AMQShortString getUserId()
+ {
+ return _userId;
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
+ _userId = userId;
+ }
+
+ public AMQShortString getAppId()
+ {
+ return _appId;
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
+ _appId = appId;
+ }
+
+ // MapMessage Interface
+
+ public boolean getBoolean(AMQShortString string) throws JMSException
+ {
+ Boolean b = getJMSHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public char getCharacter(AMQShortString string) throws JMSException
+ {
+ Character c = getJMSHeaders().getCharacter(string);
+
+ if (c == null)
+ {
+ if (getJMSHeaders().isNullStringValue(string.asString()))
+ {
+ throw new NullPointerException("Cannot convert null char");
+ }
+ else
+ {
+ throw new MessageFormatException("getChar can't use " + string + " item.");
+ }
+ }
+ else
+ {
+ return (char) c;
+ }
+ }
+
+ public byte[] getBytes(AMQShortString string) throws JMSException
+ {
+ byte[] bs = getJMSHeaders().getBytes(string);
+
+ if (bs == null)
+ {
+ throw new MessageFormatException("getBytes can't use " + string + " item.");
+ }
+ else
+ {
+ return bs;
+ }
+ }
+
+ public byte getByte(AMQShortString string) throws JMSException
+ {
+ Byte b = getJMSHeaders().getByte(string);
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getByte can't use " + string + " item.");
+ }
+ else
+ {
+ return Byte.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Byte.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public short getShort(AMQShortString string) throws JMSException
+ {
+ Short s = getJMSHeaders().getShort(string);
+
+ if (s == null)
+ {
+ s = Short.valueOf(getByte(string));
+ }
+
+ return s;
+ }
+
+ public int getInteger(AMQShortString string) throws JMSException
+ {
+ Integer i = getJMSHeaders().getInteger(string);
+
+ if (i == null)
+ {
+ i = Integer.valueOf(getShort(string));
+ }
+
+ return i;
+ }
+
+ public long getLong(AMQShortString string) throws JMSException
+ {
+ Long l = getJMSHeaders().getLong(string);
+
+ if (l == null)
+ {
+ l = Long.valueOf(getInteger(string));
+ }
+
+ return l;
+ }
+
+ public float getFloat(AMQShortString string) throws JMSException
+ {
+ Float f = getJMSHeaders().getFloat(string);
+
+ if (f == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getFloat can't use " + string + " item.");
+ }
+ else
+ {
+ return Float.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ f = Float.valueOf(null);
+ }
+
+ }
+
+ return f;
+ }
+
+ public double getDouble(AMQShortString string) throws JMSException
+ {
+ Double d = getJMSHeaders().getDouble(string);
+
+ if (d == null)
+ {
+ d = Double.valueOf(getFloat(string));
+ }
+
+ return d;
+ }
+
+ public AMQShortString getString(AMQShortString string) throws JMSException
+ {
+ AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
+
+ if (s == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object o = getJMSHeaders().getObject(string);
+ if (o instanceof byte[])
+ {
+ throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ }
+ else
+ {
+ if (o == null)
+ {
+ return null;
+ }
+ else
+ {
+ s = (AMQShortString) o;
+ }
+ }
+ }
+ }
+
+ return s;
+ }
+
+ public Object getObject(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().getObject(string);
+ }
+
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setBoolean(string, b);
+ }
+
+ public void setChar(AMQShortString string, char c) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setChar(string, c);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ return getJMSHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
+ return getJMSHeaders().setBytes(string, bytes, start, length);
+ }
+
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setByte(string, b);
+ }
+
+ public void setShort(AMQShortString string, short i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setShort(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setInteger(string, i);
+ }
+
+ public void setLong(AMQShortString string, long l) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setLong(string, l);
+ }
+
+ public void setFloat(AMQShortString string, float v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setFloat(string, v);
+ }
+
+ public void setDouble(AMQShortString string, double v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setDouble(string, v);
+ }
+
+ public void setString(AMQShortString string, AMQShortString string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setString(string.asString(), string1.asString());
+ }
+
+ public void setObject(AMQShortString string, Object object) throws JMSException
+ {
+ checkPropertyName(string);
+ try
+ {
+ getJMSHeaders().setObject(string, object);
+ }
+ catch (AMQPInvalidClassException aice)
+ {
+ throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ }
+ }
+
+ public boolean itemExists(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().containsKey(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return getJMSHeaders().getPropertyNames();
+ }
+
+ public void clear()
+ {
+ getJMSHeaders().clear();
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return getJMSHeaders().propertyExists(propertyName);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ return getJMSHeaders().setObject(key.toString(), value);
+ }
+
+ public Object remove(AMQShortString propertyName)
+ {
+ return getJMSHeaders().remove(propertyName);
+ }
+
+ public boolean isEmpty()
+ {
+ return getJMSHeaders().isEmpty();
+ }
+
+ public void writeToBuffer(ByteBuffer data)
+ {
+ getJMSHeaders().writeToBuffer(data);
+ }
+
+ public Enumeration getMapNames()
+ {
+ return getPropertyNames();
+ }
+
+ protected static void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected static void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// – Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, “Null Values.”
+// – The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// – Identifiers are case sensitive.
+// – Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+
+
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+
+ public AMQShortString getTransactionId()
+ {
+ return _transactionId;
+ }
+
+ public void setTransactionId(AMQShortString id)
+ {
+ _transactionId = id;
+ }
+
+ public AMQShortString getDestination()
+ {
+ return _destination;
+ }
+
+ public void setDestination(AMQShortString destination)
+ {
+ this._destination = destination;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ this._exchange = exchange;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ this._routingKey = routingKey;
+ }
+}
+
+
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
new file mode 100644
index 0000000000..53a2142718
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.model.ModelPhase;
+
+public class MessagePhase extends AbstractPhase {
+
+ private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ try
+ {
+ _queue.put((AMQPApplicationMessage)msg);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Error adding message to queue", e);
+ }
+ super.messageReceived(msg);
+ }
+
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ public AMQPApplicationMessage getNextMessage()
+ {
+ return _queue.poll();
+ }
+
+ public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException
+ {
+ return _queue.poll(timeout, tu);
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
new file mode 100644
index 0000000000..efd7264f96
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
@@ -0,0 +1,16 @@
+package org.apache.qpid.nclient.message;
+
+import org.apache.qpid.AMQException;
+
+public interface MessageStore {
+
+ public void removeMessage(String identifier);
+
+ public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException;
+
+ public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException;
+
+ public AMQPApplicationMessage getMessage(String identifier) throws AMQException;
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException;
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
new file mode 100644
index 0000000000..eb5a9c1778
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
@@ -0,0 +1,39 @@
+package org.apache.qpid.nclient.message;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.AMQException;
+
+public class TransientMessageStore implements MessageStore {
+
+ private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
+
+ public AMQPApplicationMessage getMessage(String identifier)
+ throws AMQException
+ {
+ return messageMap.get(identifier);
+ }
+
+ public void removeMessage(String identifier)
+ {
+ messageMap.remove(identifier);
+ }
+
+ public void storeContentBodyChunk(String identifier, byte[] contentBody)
+ throws AMQException
+ {
+
+ }
+
+ public void storeMessageMetaData(String identifier,
+ MessageHeaders messageHeaders) throws AMQException
+ {
+
+ }
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException
+ {
+
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
new file mode 100644
index 0000000000..d845059ee7
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
@@ -0,0 +1,71 @@
+package org.apache.qpid.nclient.model;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * This Phase handles Layer 3 functionality of the AMQP spec.
+ * This class acts as the interface between the API and the pipeline
+ */
+public class ModelPhase extends AbstractPhase {
+
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ private Map <Class,List> _methodListners = new HashMap<Class,List>();
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
+ {
+ super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
+ }
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ notifyMethodListerners((AMQPMethodEvent)msg);
+
+ // not doing super.methodReceived here, as this is the end of
+ // the pipeline
+ //super.messageReceived(msg);
+ }
+
+ /**
+ * This method should only except and pass messages
+ * of Type @see AMQPMethodEvent
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ /**
+ * ------------------------------------------------
+ * Event Handling
+ * ------------------------------------------------
+ */
+
+ public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
+ {
+ AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
+ eventManager.notifyEvent(event);
+ }
+
+ /**
+ * ------------------------------------------------
+ * Configuration
+ * ------------------------------------------------
+ */
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
new file mode 100644
index 0000000000..fc5878f6ef
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public interface AMQPCallbackHandler extends CallbackHandler
+{
+ void initialise(ConnectionURL url);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
new file mode 100644
index 0000000000..428cd6753d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class CallbackHandlerRegistry
+{
+ private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class);
+
+ private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+
+ private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>();
+
+ private String _mechanisms;
+
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
+
+ public Class getCallbackHandlerClass(String mechanism)
+ {
+ return _mechanismToHandlerClassMap.get(mechanism);
+ }
+
+ public String getMechanisms()
+ {
+ return _mechanisms;
+ }
+
+ private CallbackHandlerRegistry()
+ {
+ // first we register any Sasl client factories
+ DynamicSaslRegistrar.registerSaslProviders();
+ parseProperties();
+ }
+
+ private void parseProperties()
+ {
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
+ {
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
+ Class clazz = null;
+ try
+ {
+ clazz = Class.forName(className);
+ if (!AMQPCallbackHandler.class.isAssignableFrom(clazz))
+ {
+ _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class +
+ ". Skipping");
+ continue;
+ }
+ _mechanismToHandlerClassMap.put(mechanism, clazz);
+ if (_mechanisms == null)
+ {
+ _mechanisms = mechanism;
+ }
+ else
+ {
+ // one time cost
+ _mechanisms = _mechanisms + " " + mechanism;
+ }
+ }
+ catch (ClassNotFoundException ex)
+ {
+ _logger.warn("Unable to load class " + className + ". Skipping that SASL provider");
+ continue;
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
new file mode 100644
index 0000000000..958c6c4782
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.security.Security;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class DynamicSaslRegistrar
+{
+ private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class);
+
+ public static void registerSaslProviders()
+ {
+ Map<String, Class<? extends SaslClientFactory>> factories = parseProperties();
+ if (factories.size() > 0)
+ {
+ Security.addProvider(new JCAProvider(factories));
+ _logger.debug("Dynamic SASL provider added as a security provider");
+ }
+ }
+
+ private static Map<String, Class<? extends SaslClientFactory>> parseProperties()
+ {
+ List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+ for (String mechanism: mechanisms)
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
+ return factoriesToRegister;
+ }
+
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
new file mode 100644
index 0000000000..10ccb88821
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.sasl.SaslClientFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public class JCAProvider extends Provider
+{
+ public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+ "AMQ SASL providers that want to be registered");
+ register(providerMap);
+ Security.addProvider(this);
+ }
+
+ private void register(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ for (Map.Entry<String, Class<? extends SaslClientFactory>> me :
+ providerMap.entrySet())
+ {
+ put("SaslClientFactory." + me.getKey(), me.getValue().getName());
+ }
+ }
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
new file mode 100644
index 0000000000..7297d07134
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler
+{
+ private ConnectionURL _url;
+
+ public void initialise(ConnectionURL url)
+ {
+ _url = url;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_url.getUsername());
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
new file mode 100644
index 0000000000..1097346c1d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+ /**
+ * The name of this mechanism
+ */
+ public static final String MECHANISM = "AMQPLAIN";
+
+ private CallbackHandler _cbh;
+
+ public AmqPlainSaslClient(CallbackHandler cbh)
+ {
+ _cbh = cbh;
+ }
+
+ public String getMechanismName()
+ {
+ return "AMQPLAIN";
+ }
+
+ public boolean hasInitialResponse()
+ {
+ return true;
+ }
+
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+ {
+ // we do not care about the prompt or the default name
+ NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+ PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+ Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+ try
+ {
+ _cbh.handle(callbacks);
+ }
+ catch (Exception e)
+ {
+ throw new SaslException("Error handling SASL callbacks: " + e, e);
+ }
+ FieldTable table = FieldTableFactory.newFieldTable();
+ table.setString("LOGIN", nameCallback.getName());
+ table.setString("PASSWORD", new String(pwdCallback.getPassword()));
+ return table.getDataAsBytes();
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ _cbh = null;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
new file mode 100644
index 0000000000..f98c1e3a58
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+ {
+ for (int i = 0; i < mechanisms.length; i++)
+ {
+ if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+ {
+ if (cbh == null)
+ {
+ throw new SaslException("CallbackHandler must not be null");
+ }
+ return new AmqPlainSaslClient(cbh);
+ }
+ }
+ return null;
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ else
+ {
+ return new String[]{AmqPlainSaslClient.MECHANISM};
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
new file mode 100644
index 0000000000..9e878fb839
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.HashMap;
+import java.net.URISyntaxException;
+import java.net.URI;
+
+public class AMQPBrokerDetails implements BrokerDetails
+{
+ private String _host;
+
+ private int _port;
+
+ private String _transport;
+
+ private HashMap<String, String> _options;
+
+ public AMQPBrokerDetails()
+ {
+ _options = new HashMap<String, String>();
+ }
+
+ public AMQPBrokerDetails(String url) throws URLSyntaxException
+ {
+ this();
+ // URL should be of format tcp://host:port?option='value',option='value'
+ try
+ {
+ URI connection = new URI(url);
+
+ String transport = connection.getScheme();
+
+ // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+ if (transport != null)
+ {
+ //todo this list of valid transports should be enumerated somewhere
+ if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp"))))
+ {
+ if (transport.equalsIgnoreCase("localhost"))
+ {
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
+ {
+ //Then most likely we have a host:port value
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ }
+ }
+ }
+ }
+ else
+ {
+ //Default the transport
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+
+ if (transport == null)
+ {
+ URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url
+ + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ }
+
+ setTransport(transport);
+
+ String host = connection.getHost();
+
+ // Fix for Java 1.5
+ if (host == null)
+ {
+ host = "";
+ }
+
+ setHost(host);
+
+ int port = connection.getPort();
+
+ if (port == -1)
+ {
+ // Fix for when there is port data but it is not automatically parseable by getPort().
+ String auth = connection.getAuthority();
+
+ if (auth != null && auth.contains(":"))
+ {
+ int start = auth.indexOf(":") + 1;
+ int end = start;
+ boolean looking = true;
+ boolean found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ Integer.parseInt(auth.substring(start, end));
+
+ if (end >= auth.length())
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ looking = false;
+ }
+
+ }
+ if (found)
+ {
+ setPort(Integer.parseInt(auth.substring(start, end)));
+ }
+ else
+ {
+ URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ "Illegal character in port number", connection.toString());
+ }
+
+ }
+ else
+ {
+ setPort(DEFAULT_PORT);
+ }
+ }
+ else
+ {
+ setPort(port);
+ }
+
+ String queryString = connection.getQuery();
+
+ URLHelper.parseOptions(_options, queryString);
+
+ //Fragment is #string (not used)
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+
+ public AMQPBrokerDetails(String host, int port, boolean useSSL)
+ {
+ _host = host;
+ _port = port;
+
+ if (useSSL)
+ {
+ setOption(OPTIONS_SSL, "true");
+ }
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String _host)
+ {
+ this._host = _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(String _transport)
+ {
+ this._transport = _transport;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public long getTimeout()
+ {
+ if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
+ {
+ try
+ {
+ return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
+ }
+ catch (NumberFormatException nfe)
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+
+ return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_transport);
+ sb.append("://");
+
+ if (!(_transport.equalsIgnoreCase("vm")))
+ {
+ sb.append(_host);
+ }
+
+ sb.append(':');
+ sb.append(_port);
+
+ sb.append(printOptionsURL());
+
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof BrokerDetails))
+ {
+ return false;
+ }
+
+ BrokerDetails bd = (BrokerDetails) o;
+
+ return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort())
+ && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL());
+
+ //todo do we need to compare all the options as well?
+ }
+
+ private String printOptionsURL()
+ {
+ StringBuffer optionsURL = new StringBuffer();
+
+ optionsURL.append('?');
+
+ if (!(_options.isEmpty()))
+ {
+
+ for (String key : _options.keySet())
+ {
+ optionsURL.append(key);
+
+ optionsURL.append("='");
+
+ optionsURL.append(_options.get(key));
+
+ optionsURL.append("'");
+
+ optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ }
+
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ optionsURL.deleteCharAt(optionsURL.length() - 1);
+
+ return optionsURL.toString();
+ }
+
+ public boolean useSSL()
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+ if (_options.containsKey(OPTIONS_SSL))
+ {
+ return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
+ }
+
+ return USE_SSL_DEFAULT;
+ }
+
+ public void useSSL(boolean ssl)
+ {
+ setOption(OPTIONS_SSL, Boolean.toString(ssl));
+ }
+
+ public static String checkTransport(String broker)
+ {
+ if ((!broker.contains("://")))
+ {
+ return "tcp://" + broker;
+ }
+ else
+ {
+ return broker;
+ }
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
new file mode 100644
index 0000000000..d0259830c6
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
@@ -0,0 +1,412 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.*;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+public class AMQPConnectionURL implements ConnectionURL
+{
+ private String _url;
+ private String _failoverMethod;
+ private HashMap<String, String> _failoverOptions;
+ private HashMap<String, String> _options;
+ private List<BrokerDetails> _brokers;
+ private String _clientName;
+ private String _username;
+ private String _password;
+ private String _virtualHost;
+
+ public AMQPConnectionURL(String fullURL) throws URLSyntaxException
+ {
+ _url = fullURL;
+ _options = new HashMap<String, String>();
+ _brokers = new LinkedList<BrokerDetails>();
+ _failoverOptions = new HashMap<String, String>();
+
+ try
+ {
+ URI connection = new URI(fullURL);
+
+ if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
+ {
+ throw new URISyntaxException(fullURL, "Not an AMQP URL");
+ }
+
+ if (connection.getHost() == null || connection.getHost().equals(""))
+ {
+ String uid = getUniqueClientID();
+ if (uid == null)
+ {
+ URLHelper.parseError(-1, "Client Name not specified", fullURL);
+ }
+ else
+ {
+ setClientName(uid);
+ }
+
+ }
+ else
+ {
+ setClientName(connection.getHost());
+ }
+
+ String userInfo = connection.getUserInfo();
+
+ if (userInfo == null)
+ {
+ //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
+ userInfo = connection.getAuthority();
+
+ if (userInfo != null)
+ {
+ int atIndex = userInfo.indexOf('@');
+
+ if (atIndex != -1)
+ {
+ userInfo = userInfo.substring(0, atIndex);
+ }
+ else
+ {
+ userInfo = null;
+ }
+ }
+
+ }
+
+ if (userInfo == null)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
+ "User information not found on url", fullURL);
+ }
+ else
+ {
+ parseUserInfo(userInfo);
+ }
+ String virtualHost = connection.getPath();
+
+ if (virtualHost != null && (!virtualHost.equals("")))
+ {
+ setVirtualHost(virtualHost);
+ }
+ else
+ {
+ int authLength = connection.getAuthority().length();
+ int start = AMQ_PROTOCOL.length() + 3;
+ int testIndex = start + authLength;
+ if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
+ {
+ URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(-1, "Virtual host not specified", fullURL);
+ }
+
+ }
+
+
+ URLHelper.parseOptions(_options, connection.getQuery());
+
+ processOptions();
+
+ //Fragment is #string (not used)
+ //System.out.println(connection.getFragment());
+
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ int slash = fullURL.indexOf("\\");
+
+ if (slash == -1)
+ {
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ else
+ {
+ if (slash != 0 && fullURL.charAt(slash - 1) == ':')
+ {
+ URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
+ }
+ else
+ {
+ URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
+ }
+ }
+
+ }
+ }
+
+ private String getUniqueClientID()
+ {
+ try
+ {
+ InetAddress addr = InetAddress.getLocalHost();
+ return addr.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ return null;
+ }
+ }
+
+ private void parseUserInfo(String userinfo) throws URLSyntaxException
+ {
+ //user info = user:pass
+
+ int colonIndex = userinfo.indexOf(':');
+
+ if (colonIndex == -1)
+ {
+ URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
+ "Null password in user information not allowed.", _url);
+ }
+ else
+ {
+ setUsername(userinfo.substring(0, colonIndex));
+ setPassword(userinfo.substring(colonIndex + 1));
+ }
+
+ }
+
+ private void processOptions() throws URLSyntaxException
+ {
+ if (_options.containsKey(OPTIONS_BROKERLIST))
+ {
+ String brokerlist = _options.get(OPTIONS_BROKERLIST);
+
+ //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+ StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
+
+ while (st.hasMoreTokens())
+ {
+ String broker = st.nextToken();
+
+ _brokers.add(new AMQPBrokerDetails(broker));
+ }
+
+ _options.remove(OPTIONS_BROKERLIST);
+ }
+
+ if (_options.containsKey(OPTIONS_FAILOVER))
+ {
+ String failover = _options.get(OPTIONS_FAILOVER);
+
+ // failover='method?option='value',option='value''
+
+ int methodIndex = failover.indexOf('?');
+
+ if (methodIndex > -1)
+ {
+ _failoverMethod = failover.substring(0, methodIndex);
+ URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
+ }
+ else
+ {
+ _failoverMethod = failover;
+ }
+
+ _options.remove(OPTIONS_FAILOVER);
+ }
+ }
+
+ public String getURL()
+ {
+ return _url;
+ }
+
+ public String getFailoverMethod()
+ {
+ return _failoverMethod;
+ }
+
+ public String getFailoverOption(String key)
+ {
+ return _failoverOptions.get(key);
+ }
+
+ public void setFailoverOption(String key, String value)
+ {
+ _failoverOptions.put(key, value);
+ }
+
+ public int getBrokerCount()
+ {
+ return _brokers.size();
+ }
+
+ public BrokerDetails getBrokerDetails(int index)
+ {
+ if (index < _brokers.size())
+ {
+ return _brokers.get(index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void addBrokerDetails(BrokerDetails broker)
+ {
+ if (!(_brokers.contains(broker)))
+ {
+ _brokers.add(broker);
+ }
+ }
+
+ public List<BrokerDetails> getAllBrokerDetails()
+ {
+ return _brokers;
+ }
+
+ public String getClientName()
+ {
+ return _clientName;
+ }
+
+ public void setClientName(String clientName)
+ {
+ _clientName = clientName;
+ }
+
+ public String getUsername()
+ {
+ return _username;
+ }
+
+ public void setUsername(String username)
+ {
+ _username = username;
+ }
+
+ public String getPassword()
+ {
+ return _password;
+ }
+
+ public void setPassword(String password)
+ {
+ _password = password;
+ }
+
+ public String getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(String virtuaHost)
+ {
+ _virtualHost = virtuaHost;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(AMQ_PROTOCOL);
+ sb.append("://");
+
+ if (_username != null)
+ {
+ sb.append(_username);
+
+ if (_password != null)
+ {
+ sb.append(':');
+ sb.append(_password);
+ }
+
+ sb.append('@');
+ }
+
+ sb.append(_clientName);
+
+ sb.append(_virtualHost);
+
+ sb.append(optionsToString());
+
+ return sb.toString();
+ }
+
+ private String optionsToString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("?" + OPTIONS_BROKERLIST + "='");
+
+ for (BrokerDetails service : _brokers)
+ {
+ sb.append(service.toString());
+ sb.append(';');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ sb.append("'");
+
+ if (_failoverMethod != null)
+ {
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ sb.append(OPTIONS_FAILOVER + "='");
+ sb.append(_failoverMethod);
+ sb.append(URLHelper.printOptions(_failoverOptions));
+ sb.append("'");
+ }
+
+ return sb.toString();
+ }
+
+
+ public static void main(String[] args) throws URLSyntaxException
+ {
+
+ String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
+ //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
+
+ //ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
+
+ System.out.println(url2);
+ //System.out.println(connectionurl2);
+
+ }
+
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
new file mode 100644
index 0000000000..fe20a1e8dd
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+public interface BrokerDetails
+{
+
+ /*
+ * Known URL Options
+ * @see ConnectionURL
+ */
+ public static final String OPTIONS_RETRY = "retries";
+ public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
+ public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final int DEFAULT_PORT = 5672;
+
+ public static final String TCP = "tcp";
+ public static final String VM = "vm";
+
+ public static final String DEFAULT_TRANSPORT = TCP;
+
+ public static final String URL_FORMAT_EXAMPLE =
+ "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
+
+ public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
+ public static final boolean USE_SSL_DEFAULT = false;
+
+ String getHost();
+
+ void setHost(String host);
+
+ int getPort();
+
+ void setPort(int port);
+
+ String getTransport();
+
+ void setTransport(String transport);
+
+ boolean useSSL();
+
+ void useSSL(boolean ssl);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+
+ long getTimeout();
+
+ void setTimeout(long timeout);
+
+ String toString();
+
+ boolean equals(Object o);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
new file mode 100644
index 0000000000..79653dc442
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.util.List;
+
+/**
+ Connection URL format
+ For TCP:
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\'
+
+ For VMBroker:
+ vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+
+ Options are of course optional except for requiring a single broker in the broker list.
+ The option seperator is defined to be either '&' or ','
+ */
+public interface ConnectionURL
+{
+ public static final String AMQ_PROTOCOL = "amqp";
+ public static final String OPTIONS_BROKERLIST = "brokerlist";
+ public static final String OPTIONS_FAILOVER = "failover";
+ public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
+ public static final String OPTIONS_SSL = "ssl";
+
+ String getURL();
+
+ String getFailoverMethod();
+
+ String getFailoverOption(String key);
+
+ int getBrokerCount();
+
+ BrokerDetails getBrokerDetails(int index);
+
+ void addBrokerDetails(BrokerDetails broker);
+
+ List<BrokerDetails> getAllBrokerDetails();
+
+ String getClientName();
+
+ void setClientName(String clientName);
+
+ String getUsername();
+
+ void setUsername(String username);
+
+ String getPassword();
+
+ void setPassword(String password);
+
+ String getVirtualHost();
+
+ void setVirtualHost(String virtualHost);
+
+ String getOption(String key);
+
+ void setOption(String key, String value);
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
new file mode 100644
index 0000000000..734aa68a9d
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
@@ -0,0 +1,83 @@
+package org.apache.qpid.nclient.transport;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+public class TCPConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(TCPConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+ private PhaseContext _ctx;
+
+ protected TCPConnection(ConnectionURL url, PhaseContext ctx)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+ _ctx = ctx;
+
+ ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
+
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
+ {
+ // Not sure what the original code wanted use :)
+ }
+ else
+ {
+ _logger.info("Using SimpleByteBufferAllocator");
+ ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ _ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig();
+
+ // if we do not use our own thread model we get the MINA default which is to use
+ // its own leader-follower model
+ if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
+ scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
+ scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
+ }
+
+ // Returns the phase pipe
+ public Phase connect() throws AMQPException
+ {
+ _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(_ctx);
+ _phase.start();
+
+ return _phase;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
new file mode 100644
index 0000000000..9df353a2df
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+public interface TransportConnection
+{
+ public Phase connect()throws AMQPException;
+
+ public void close()throws AMQPException;
+
+ public Phase getPhasePipe();
+} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
new file mode 100644
index 0000000000..ce2145c08b
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
@@ -0,0 +1,36 @@
+package org.apache.qpid.nclient.transport;
+
+import java.net.URISyntaxException;
+
+import org.apache.qpid.nclient.core.PhaseContext;
+
+public class TransportConnectionFactory
+{
+ public enum ConnectionType
+ {
+ TCP,VM
+ }
+
+ public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException
+ {
+ return createTransportConnection(new AMQPConnectionURL(url),type,ctx);
+
+ }
+
+ public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx)
+ {
+ switch (type)
+ {
+ case TCP : default:
+ {
+ return new TCPConnection(url,ctx);
+ }
+
+ case VM :
+ {
+ return new VMConnection(url,ctx);
+ }
+ }
+
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
new file mode 100644
index 0000000000..911e855d4f
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+/**
+ * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame
+ * layer
+ *
+ */
+public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList
+{
+
+ private static final Logger _logger = Logger
+ .getLogger(TransportPhase.class);
+
+ private IoSession _ioSession;
+ private BrokerDetails _brokerDetails;
+
+ protected WriteFuture _lastWriteFuture;
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+
+ public void start()throws AMQPException
+ {
+ _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
+ IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);
+
+ final SocketAddress address;
+ if (ioConnector instanceof VmPipeConnector)
+ {
+ address = new VmPipeAddress(_brokerDetails.getPort());
+ }
+ else
+ {
+ address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort());
+ _logger.info("Attempting connection to " + address);
+
+ }
+
+ ConnectFuture future = ioConnector.connect(address,this);
+
+ // wait for connection to complete
+ if (future.join(_brokerDetails.getTimeout()))
+ {
+ // we call getSession which throws an IOException if there has been an error connecting
+ future.getSession();
+ }
+ else
+ {
+ throw new AMQPException("Timeout waiting for connection.");
+ }
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ super.messageReceived(frame);
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+ _ioSession.write(frame);
+ }
+
+ /**
+ * ------------------------------------------------
+ * IoHandler - methods introduced by IoHandler
+ * ------------------------------------------------
+ */
+ public void sessionIdle(IoSession session, IdleStatus status)
+ throws Exception
+ {
+ _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: "
+ + status);
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ // write heartbeat frame:
+ _logger.debug("Sent heartbeat");
+ session.write(HeartbeatBody.FRAME);
+ // HeartbeatDiagnostics.sent();
+ } else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ // failover:
+ // HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ session.close();
+ }
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ throws Exception
+ {
+ AMQFrame frame = (AMQFrame) message;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof HeartbeatBody)
+ {
+ _logger.debug("Received heartbeat");
+ } else
+ {
+ messageReceived(frame);
+ }
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ _logger.debug("Sent frame " + message);
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause)
+ throws Exception
+ {
+ // Need to handle failover
+ _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause);
+ //sessionClosed(session);
+ }
+
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ // Need to handle failover
+ _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed");
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ _logger.info("Protocol session created for " + this + " session : "
+ + System.identityHashCode(session));
+
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
+ new AMQCodecFactory(false));
+
+ if (ClientConfiguration.get().getBoolean(
+ QpidConstants.USE_SHARED_READ_WRITE_POOL))
+ {
+ session.getFilterChain().addBefore("AsynchronousWriteFilter",
+ "protocolFilter", pcf);
+ }
+ else
+ {
+ session.getFilterChain().addLast("protocolFilter", pcf);
+ }
+ // we only add the SSL filter where we have an SSL connection
+ if (_brokerDetails.useSSL())
+ {
+ // FIXME: Bogus context cannot be used in production.
+ SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
+ .getInstance(false));
+ sslFilter.setUseClientMode(true);
+ session.getFilterChain().addBefore("protocolFilter", "ssl",
+ sslFilter);
+ }
+
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel
+ .getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(
+ session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(
+ session);
+ } catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
+ _ioSession = session;
+ doAMQPConnectionNegotiation();
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ _logger.info("Protocol session opened for " + this + " : session "
+ + System.identityHashCode(session));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Protocol related methods
+ * ----------------------------------------------------------
+ */
+ private void doAMQPConnectionNegotiation()
+ {
+ int i = pv.length - 1;
+ _logger.debug("Engaging in connection negotiation");
+ writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+ }
+
+ /**
+ * ----------------------------------------------------------
+ * Write Operations
+ * ----------------------------------------------------------
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ writeFrame(frame, false);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ WriteFuture f = _ioSession.write(frame);
+ if (wait)
+ {
+ // fixme -- time out?
+ f.join();
+ } else
+ {
+ _lastWriteFuture = f;
+ }
+ }
+
+ /**
+ * -----------------------------------------------------------
+ * Failover section
+ * -----------------------------------------------------------
+ */
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
new file mode 100644
index 0000000000..ba38848149
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
@@ -0,0 +1,135 @@
+package org.apache.qpid.nclient.transport;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
+import org.apache.mina.transport.vmpipe.VmPipeAddress;
+import org.apache.mina.transport.vmpipe.VmPipeConnector;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.PhaseFactory;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.pool.PoolingFilter;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
+
+public class VMConnection implements TransportConnection
+{
+ private static final Logger _logger = Logger.getLogger(VMConnection.class);
+ private BrokerDetails _brokerDetails;
+ private IoConnector _ioConnector;
+ private Phase _phase;
+ private PhaseContext _ctx;
+
+ protected VMConnection(ConnectionURL url,PhaseContext ctx)
+ {
+ _brokerDetails = url.getBrokerDetails(0);
+ _ctx = ctx;
+
+ _ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
+ ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
+ PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
+ "AsynchronousReadFilter");
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
+ "AsynchronousWriteFilter");
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ }
+
+ public Phase connect() throws AMQPException
+ {
+ createVMBroker();
+
+ _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+
+ _phase = PhaseFactory.createPhasePipe(_ctx);
+ _phase.start();
+
+ return _phase;
+
+ }
+
+ private void createVMBroker()throws AMQPException
+ {
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+
+ VmPipeAcceptor acceptor = new VmPipeAcceptor();
+ IoServiceConfig config = acceptor.getDefaultConfig();
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+
+ IoHandlerAdapter provider = null;
+ try
+ {
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ provider = createBrokerInstance(_brokerDetails.getPort());
+ acceptor.bind(pipe, provider);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
+ }
+ catch (IOException e)
+ {
+ _logger.error(e);
+ VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
+ acceptor.unbind(pipe);
+
+ throw new AMQPException("Error creating VM broker",e);
+ }
+ }
+
+ private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
+ {
+ String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);
+ _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
+
+ // can't use introspection to get Provider as it is a server class.
+ // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
+
+ //get correct constructor and pass in instancec ID - "port"
+ IoHandlerAdapter provider;
+ try
+ {
+ Class[] cnstr = {Integer.class};
+ Object[] params = {port};
+ provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ //Give the broker a second to create
+ _logger.info("Created VMBroker Instance:" + port);
+ }
+ catch (Exception e)
+ {
+ _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause());
+ _logger.error(e);
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
+
+
+ throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e);
+ }
+
+ return provider;
+ }
+
+ public void close() throws AMQPException
+ {
+
+ }
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
new file mode 100644
index 0000000000..e161e30227
--- /dev/null
+++ b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
@@ -0,0 +1,14 @@
+package org.apache.qpid.nclient.util;
+
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class AMQPValidator
+{
+ public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException
+ {
+ if(obj == null)
+ {
+ throw new AMQPException(msg);
+ }
+ }
+}