summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/common/protocol-version.xml4
-rw-r--r--java/newclient/.project17
-rw-r--r--java/newclient/pom.xml194
-rw-r--r--java/newclient/src/main/java/client.log4j29
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java43
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java98
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java56
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java73
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java38
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java94
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java76
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java49
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java41
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java78
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java11
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java368
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java286
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java448
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java121
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java224
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java92
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java256
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java130
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java123
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java84
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java77
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java74
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java454
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java66
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java68
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java8
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java26
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java13
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java50
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java47
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java100
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml37
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java55
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java97
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java20
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java38
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java35
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java63
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java67
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java19
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java188
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java165
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java240
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java122
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java679
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java64
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java16
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java39
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java71
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java30
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java103
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java75
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java46
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java60
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java105
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java62
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java344
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java412
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java73
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java77
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java83
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java34
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java36
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java266
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java135
-rw-r--r--java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java14
76 files changed, 2 insertions, 8180 deletions
diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml
index 8afc8a7de7..49cb2102b3 100644
--- a/java/common/protocol-version.xml
+++ b/java/common/protocol-version.xml
@@ -27,8 +27,8 @@
<property name="generated.dir" location="${generated.path}/${generated.package}" />
<property name="generated.timestamp" location="${generated.dir}/timestamp" />
<property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" />
- <property name="xml.spec.deps" value="amqp.0-8.xml cluster.0-8.xml" />
- <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/cluster.0-8.xml" />
+ <property name="xml.spec.deps" value="amqp.0-8.xml cluster.0-8.xml message.0-9.xml" />
+ <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/cluster.0-8.xml ${xml.spec.dir}/message.0-9.xml" />
<target name="generate" depends="compile_generator,check_generate_deps" unless="generation.notRequired">
<mkdir dir="${generated.dir}"/>
diff --git a/java/newclient/.project b/java/newclient/.project
deleted file mode 100644
index 4d42311982..0000000000
--- a/java/newclient/.project
+++ /dev/null
@@ -1,17 +0,0 @@
-<projectDescription>
- <name>qpid-newclient</name>
- <comment/>
- <projects>
- <project>qpid-broker</project>
- <project>qpid-common</project>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments/>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription> \ No newline at end of file
diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml
deleted file mode 100644
index 5a82feb1d2..0000000000
--- a/java/newclient/pom.xml
+++ /dev/null
@@ -1,194 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-newclient</artifactId>
- <packaging>jar</packaging>
- <version>1.0-incubating-M2-SNAPSHOT</version>
- <name>Qpid New Client</name>
- <url>http://cwiki.apache.org/confluence/display/qpid</url>
-
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid</artifactId>
- <version>1.0-incubating-M2-SNAPSHOT</version>
- </parent>
-
- <properties>
- <topDirectoryLocation>..</topDirectoryLocation>
- <java.source.version>1.5</java.source.version>
- <qpid.version>${pom.version}</qpid.version>
- <qpid.targetDir>${project.build.directory}</qpid.targetDir>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-common</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </dependency>
-
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- <version>1.3</version>
- </dependency>
-
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mina</groupId>
- <artifactId>mina-filter-ssl</artifactId>
- </dependency>
-
- <!-- Test Dependencies -->
- <dependency> <!-- for inVm Broker -->
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>jmscts</groupId>
- <artifactId>jmscts</artifactId>
- <version>0.5-b2</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymockclassextension</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <systemProperties>
- <property>
- <name>amqj.noAutoCreateVMBroker</name>
- <value>true</value>
- </property>
- <property>
- <name>amqj.logging.level</name>
- <value>${amqj.logging.level}</value>
- </property>
- <property>
- <name>log4j.configuration</name>
- <value>file:///${basedir}/src/main/java/client.log4j</value>
- </property>
- </systemProperties>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
-
-<!-- The inclusion of this resource causes the build to hang. -->
- <!--resources>
- <resource>
- <targetPath>META-INF/</targetPath>
- <filtering>false</filtering>
- <directory>../resources/META-INF</directory>
- <includes>
- <include>**</include>
- </includes>
- </resource>
- </resources-->
-
- <testResources>
- <testResource>
- <targetPath>META-INF/</targetPath>
- <filtering>false</filtering>
- <directory>../resources/META-INF</directory>
- <includes>
- <include>**</include>
- </includes>
- </testResource>
- <testResource>
- <targetPath>src/</targetPath>
- <filtering>false</filtering>
- <directory>src/test/java</directory>
- <includes>
- <include>**/*.java</include>
- </includes>
- </testResource>
-
- <testResource>
- <targetPath></targetPath>
- <filtering>false</filtering>
- <directory>src/main/java</directory>
- <includes>
- <include>client.log4j</include>
- </includes>
- </testResource>
- </testResources>
-
- </build>
-
-</project>
diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j
deleted file mode 100644
index c2c7326479..0000000000
--- a/java/newclient/src/main/java/client.log4j
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=DEBUG
-
-
-#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
-log4j.logger.org.apache.qpid=DEBUG, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
deleted file mode 100644
index a733fb7db7..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQMethodBody;
-
-public abstract class AMQPCallBack
-{
- private boolean _isComplete = false;
-
- public abstract void brokerResponded(AMQMethodBody body);
-
- public abstract void brokerRespondedWithError(AMQException e);
-
- public void setIsComplete(boolean isComplete)
- {
- _isComplete = isComplete;
- }
-
- public boolean isComplete()
- {
- return _isComplete;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
deleted file mode 100644
index f984e812c2..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import java.security.SecureRandom;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-public abstract class AMQPCallBackSupport
-{
- private SecureRandom _localCorrelationIdGenerator = new SecureRandom();
- protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>();
-
- //the channelId assigned for this instance
- protected int _channelId;
-
- public AMQPCallBackSupport(int channelId)
- {
- _channelId = channelId;
- }
-
- private long getNextCorrelationId()
- {
- return _localCorrelationIdGenerator.nextLong();
- }
-
-
- // For methods that still use nowait, hopefully they will remove nowait
- protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb)
- {
- if(noWait)
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
- return msg;
- }
- else
- {
- // u only need to register if u are expecting a response
- long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
- _cbMap.put(localCorrelationId, cb);
- return msg;
- }
- }
-
- protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb)
- {
- long localCorrelationId = getNextCorrelationId();
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
- _cbMap.put(localCorrelationId, cb);
- return msg;
- }
-
- protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException
- {
- if(_cbMap.containsKey(localCorrelationId))
- {
- AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
- if(cb == null)
- {
- throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody);
- }
- else
- {
- cb.setIsComplete(true);
- cb.brokerResponded(methodBody);
- }
- _cbMap.remove(localCorrelationId);
- }
- else
- {
- //ignore, as this event is for another class instance
- }
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
deleted file mode 100644
index cbb60b130d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ChannelOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ChannelResumeBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPChannel
-{
-
- /**
- * Opens the channel
- */
- public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException;
-
- /**
- * Close the channel
- */
- public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException;
-
- /**
- * Channel Flow
- */
- public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException;
-
- /**
- * Close the channel
- */
- public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException;
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
deleted file mode 100644
index 4976daa4fa..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage;
-import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
-import org.apache.qpid.url.URLSyntaxException;
-
-public interface AMQPClassFactory
-{
-
- public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException;
-
- public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException;
-
- public abstract AMQPChannel createChannelClass(int channel) throws AMQPException;
-
- public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException;
-
- public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException;
-
- public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException;
-
- public abstract AMQPQueue createQueueClass(int channel) throws AMQPException;
-
- public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException;
-
- public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException;
-
- public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException;
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the event manager
- * and add listeners to get notified of events
- *
- */
- public abstract AMQPEventManager getEventManager();
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the state manager
- * and add listeners to get notified of state changes
- *
- */
- public abstract AMQPStateManager getStateManager();
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
deleted file mode 100644
index b18fed5605..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPConnection
-{
-
- /**
- * Opens the TCP connection and let the formalities begin.
- */
- public abstract ConnectionStartBody openTCPConnection() throws AMQPException;
-
- /**
- * The current java broker implementation can send a connection tune body
- * as a response to the startOk. Not sure if that is the correct behaviour.
- */
- public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException;
-
- /**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
- public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException;
-
- public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException;
-
- public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException;
-
- public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException;
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
deleted file mode 100644
index 53e11c48fb..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.DtxCoordinationCommitBody;
-import org.apache.qpid.framing.DtxCoordinationForgetBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverBody;
-import org.apache.qpid.framing.DtxCoordinationRollbackBody;
-import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPDtxCoordination
-{
- public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException;
-
- public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException;
-
- public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException;
-
- public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException;
-
- public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException;
-
- public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException;
-
- public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException;
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java
deleted file mode 100644
index 41f1414205..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.DtxDemarcationEndBody;
-import org.apache.qpid.framing.DtxDemarcationEndOkBody;
-import org.apache.qpid.framing.DtxDemarcationSelectBody;
-import org.apache.qpid.framing.DtxDemarcationSelectOkBody;
-import org.apache.qpid.framing.DtxDemarcationStartBody;
-import org.apache.qpid.framing.DtxDemarcationStartOkBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPDtxDemarcation
-{
- public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException;
-
- public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException;
-
- public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException;
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
deleted file mode 100644
index c2f93b8f42..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPExchange
-{
-
- /**
- * -----------------------------------------------
- * API Methods
- * -----------------------------------------------
- */
- public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException;
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
deleted file mode 100644
index 61c5825fe0..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageConsumeBody;
-import org.apache.qpid.framing.MessageGetBody;
-import org.apache.qpid.framing.MessageOffsetBody;
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageQosBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPMessage
-{
-
- /**
- * -----------------------------------------------
- * API Methods
- * -----------------------------------------------
- */
-
- public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException;
-
- /**
- * The correlationId from the request.
- * For example if a message.transfer is sent with correlationId "ABCD"
- * then u need to pass that in. This correlation id is used by the execution layer
- * to handle the correlation of method requests and responses
- */
- public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException;
-
- /**
- * The correlationId from the request.
- * For example if a message.transfer is sent with correlationId "ABCD"
- * then u need to pass that in. This correlation id is used by the execution layer
- * to handle the correlation of method requests and responses
- */
- public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException;
-
- /**
- * The correlationId from the request.
- * For example if a message.resume is sent with correlationId "ABCD"
- * then u need to pass that in. This correlation id is used by the execution layer
- * to handle the correlation of method requests and responses
- */
- public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException;
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
deleted file mode 100644
index c10d6975c6..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-/**
- * This class also represents the AMQP Message class.
- * You need an instance per channel.
- * This is passed in as an argument in the constructor of an AMQPMessage instance.
- * A client who implements this interface is notified When the broker issues
- * Message class methods on the client.
- *
- * A Client should use the AMQPMessage class when it wants to issue Message class
- * methods on the broker.
- *
- * A JMS MessageConsumer implementation can implement this interface and map
- * AMQP Method notifications to the appropriate JMS methods.
- *
- * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance.
- *
- */
-
-public interface AMQPMessageCallBack
-{
- /**
- * -----------------------------------------------------------------------
- * This provides Notifications for broker initiated Message class methods.
- * All methods have a correlationId that u need to pass into
- * the corresponding Message methods when responding to the broker.
- *
- * For example the correlationID passed in from Message.trasnfer
- * should be passed back when u call Message.ok in AMQPMessage
- * -----------------------------------------------------------------------
- */
-
-
- public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException;
-
- public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException;
-
- public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ;
-
- public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException;
-
- public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException;
-
- public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException;
-
- public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException;
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
deleted file mode 100644
index ff26c6adf5..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPQueue
-{
-
- /**
- * -----------------------------------------------
- * API Methods
- * -----------------------------------------------
- */
- public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException;
-
- // Queue.unbind doesn't have nowait
- public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException;
-
- public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException;
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
deleted file mode 100644
index 6d0e83bb7e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp;
-
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-public class AbstractAMQPClassFactory
-{
- public static AMQPClassFactory getFactoryInstance() throws AMQPException
- {
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY);
- try
- {
- return (AMQPClassFactory)Class.forName(className).newInstance();
- }
- catch(Exception e)
- {
- throw new AMQPException("Error creating AMQPClassFactory",e);
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
deleted file mode 100644
index f682659f9e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.qpid.nclient.amqp.event;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPEventManager
-{
- public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
- public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
- public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException;
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
deleted file mode 100644
index c6641890e0..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.apache.qpid.nclient.amqp.event;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-/**
- * This class is exactly the same as the AMQMethod event.
- * Except I renamed requestId to corelationId, so I could use it both ways.
- *
- * I didn't want to modify anything in common so that there is no
- * impact on the existing code.
- *
- */
-public class AMQPMethodEvent<M extends AMQMethodBody>
-{
-
- private final M _method;
-
- private final int _channelId;
-
- /**
- * This is the rquest id from the broker when it sent me a request
- * when I respond I remember this id and copy this to the outgoing
- * response.
- */
- private final long _correlationId;
-
- /**
- * I could use _correlationId, bcos when I send a request
- * this field is blank and is only used internally. But I
- * used a seperate field to make it more clear.
- */
- private long _localCorrletionId = 0;
-
- public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId)
- {
- _channelId = channelId;
- _method = method;
- _correlationId = correlationId;
- _localCorrletionId = localCorrletionId;
- }
-
- public AMQPMethodEvent(int channelId, M method, long correlationId)
- {
- _channelId = channelId;
- _method = method;
- _correlationId = correlationId;
- }
-
- public M getMethod()
- {
- return _method;
- }
-
- public int getChannelId()
- {
- return _channelId;
- }
-
- public long getCorrelationId()
- {
- return _correlationId;
- }
-
- public long getLocalCorrelationId()
- {
- return _localCorrletionId;
- }
-
- public String toString()
- {
- StringBuilder buf = new StringBuilder("Method event: \n");
- buf.append("Channel id: \n").append(_channelId);
- buf.append("Method: \n").append(_method);
- buf.append("Request Id: ").append(_correlationId);
- buf.append("Local Correlation Id: ").append(_localCorrletionId);
- return buf.toString();
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
deleted file mode 100644
index e77a38121c..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package org.apache.qpid.nclient.amqp.event;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPMethodListener
-{
-
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
deleted file mode 100644
index decb120796..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.nclient.amqp.qpid;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ChannelOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ChannelResumeBody;
-import org.apache.qpid.nclient.amqp.AMQPChannel;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.util.AMQPValidator;
-
-/**
- * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
- * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
- * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
- * channel by design.
- *
- * A JMS Session can wrap an instance of this class.
- */
-
-public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel
-{
- private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class);
-
- // the channelId assigned for this channel
- private int _channelId;
-
- private Phase _phase;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
-
- private final AMQPState[] _validResumeStates = new AMQPState[]
- { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _channelNotOpend = _lock.newCondition();
-
- private final Condition _channelNotClosed = _lock.newCondition();
-
- private final Condition _channelFlowNotResponded = _lock.newCondition();
-
- private final Condition _channelNotResumed = _lock.newCondition();
-
- private ChannelOpenOkBody _channelOpenOkBody;
-
- private ChannelCloseOkBody _channelCloseOkBody;
-
- private ChannelFlowOkBody _channelFlowOkBody;
-
- private ChannelOkBody _channelOkBody;
-
- private ChannelCloseBody _channelCloseBody;
-
- protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
- {
- _channelId = channelId;
- _phase = phase;
- _stateManager = stateManager;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody)
- */
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotOpend.await();
- checkIfChannelClosed();
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- notifyState(AMQPState.CHANNEL_OPENED);
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody)
- */
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotClosed.await();
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- notifyState(AMQPState.CHANNEL_CLOSED);
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody)
- */
- public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelFlowOkBody = null;
- if (channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelFlowNotResponded.await();
- checkIfChannelClosed();
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.flow", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody)
- */
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotResumed.await();
- checkIfChannelClosed();
- AMQPValidator.throwExceptionOnNull(_channelOkBody,
- "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- notifyState(AMQPState.CHANNEL_OPENED);
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.resume", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQPMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- _channelCloseBody = (ChannelCloseBody) evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleChannelClose(_channelCloseBody);
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody) evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody) evt.getMethod();
- // In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- notifyState(AMQPState.CHANNEL_CLOSED);
- _currentState = AMQPState.CHANNEL_CLOSED;
- // handle channel related cleanup
- }
-
- private void releaseLocks()
- {
- if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
- {
- _channelNotOpend.signal();
- _channelNotResumed.signal(); // It could be a channel.resume call
- }
- else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
- {
- _channelFlowNotResponded.signal();
- }
- else if (_currentState == AMQPState.CHANNEL_CLOSED)
- {
- _channelNotResumed.signal();
- }
- }
-
- private void checkIfChannelClosed() throws AMQPException
- {
- if (_channelCloseBody != null)
- {
- String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
- + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
- + _channelCloseBody.getMethod();
-
- throw new AMQPException(error);
- }
- }
-
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
- {
- _lock.lock();
- try
- {
- handleChannelFlowState(channelFlowBody.active);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelFlowState(boolean flow)throws AMQPException
- {
- notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
- }
-
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
deleted file mode 100644
index 809aa57dab..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ChannelOkBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageGetBody;
-import org.apache.qpid.framing.MessageOffsetBody;
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageQosBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.nclient.amqp.AMQPChannel;
-import org.apache.qpid.nclient.amqp.AMQPClassFactory;
-import org.apache.qpid.nclient.amqp.AMQPConnection;
-import org.apache.qpid.nclient.amqp.AMQPExchange;
-import org.apache.qpid.nclient.amqp.AMQPMessage;
-import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
-import org.apache.qpid.nclient.amqp.AMQPQueue;
-import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.DefaultPhaseContext;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.transport.AMQPConnectionURL;
-import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
-import org.apache.qpid.url.URLSyntaxException;
-
-/**
- * The Class Factory creates AMQP Class
- * equivalents defined in the spec.
- *
- * There should one instance per connection.
- * The factory class creates all the support
- * classes and provides an instance of the
- * AMQP class in ready-to-use state.
- *
- */
-public class QpidAMQPClassFactory implements AMQPClassFactory
-{
- //Need an event manager per connection
- private AMQPEventManager _eventManager = new QpidEventManager();
-
- // Need a state manager per connection
- private AMQPStateManager _stateManager = new QpidStateManager();
-
- //Need a phase pipe per connection
- private Phase _phase;
-
- //One instance per connection
- private QpidAMQPConnection _amqpConnection;
-
- public QpidAMQPClassFactory()
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
- */
- public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
- {
- AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
- return createConnectionClass(url, type);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
- */
- public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
- {
- if (_amqpConnection == null)
- {
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
-
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
- _amqpConnection = new QpidAMQPConnection(conn, _stateManager);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
- }
- return _amqpConnection;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int)
- */
- public AMQPChannel createChannelClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager);
- _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
- return amqpChannel;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel)
- */
- public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int)
- */
- public AMQPExchange createExchangeClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase);
- _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
- _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
- return amqpExchange;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange)
- */
- public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int)
- */
- public AMQPQueue createQueueClass(int channel) throws AMQPException
- {
- checkIfConnectionStarted();
- QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase);
- _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
- return amqpQueue;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue)
- */
- public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack)
- */
- public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
- {
- checkIfConnectionStarted();
- QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb);
- _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
-
- return amqpMessage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage)
- */
- public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
- }
-
- //This class should register as a state listener for AMQPConnection
- private void checkIfConnectionStarted() throws AMQPException
- {
- if (_phase == null)
- {
- _phase = _amqpConnection.getPhasePipe();
-
- if (_phase == null)
- {
- throw new AMQPException("Cannot create a channel until connection is ready");
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager()
- */
- public AMQPEventManager getEventManager()
- {
- return _eventManager;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager()
- */
- public AMQPStateManager getStateManager()
- {
- return _stateManager;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
deleted file mode 100644
index 9b4d776cc5..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.nclient.amqp.AMQPConnection;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.util.AMQPValidator;
-
-/**
- * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
- * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
- * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
- */
-public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection
-{
- private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class);
-
- private Phase _phase;
-
- private TransportConnection _connection;
-
- private long _correlationId;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validCloseStates = new AMQPState[]
- { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
- AMQPState.CONNECTION_OPEN, };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _connectionNotStarted = _lock.newCondition();
-
- private final Condition _connectionNotSecure = _lock.newCondition();
-
- private final Condition _connectionNotTuned = _lock.newCondition();
-
- private final Condition _connectionNotOpened = _lock.newCondition();
-
- private final Condition _connectionNotClosed = _lock.newCondition();
-
- private ConnectionStartBody _connectionStartBody;
-
- private ConnectionSecureBody _connectionSecureBody;
-
- private ConnectionTuneBody _connectionTuneBody;
-
- private ConnectionOpenOkBody _connectionOpenOkBody;
-
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
-
- protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
- {
- _connection = connection;
- _stateManager = stateManager;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection()
- */
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
- {
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- notifyState(AMQPState.CONNECTION_NOT_STARTED);
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody)
- */
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody)
- */
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- notifyState(AMQPState.CONNECTION_NOT_TUNED);
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- notifyState(AMQPState.CONNECTION_NOT_SECURE);
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody)
- */
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- notifyState(AMQPState.CONNECTION_NOT_OPENED);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody)
- */
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- notifyState(AMQPState.CONNECTION_OPEN);
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody)
- */
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- notifyState(AMQPState.CONNECTION_CLOSED);
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- notifyState(AMQPState.CONNECTION_CLOSING);
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("Error handling connection.close from broker", e);
- }
- }
-
- private void checkIfConnectionClosed() throws AMQPException
- {
- if (_connectionCloseBody != null)
- {
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
- + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
- + _connectionCloseBody.getMethod();
-
- throw new AMQPException(error);
- }
- }
-
- private void releaseLocks()
- {
- if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
- {
- _connectionNotOpened.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
- {
- _connectionNotStarted.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
- {
- _connectionNotSecure.signal();
- }
- else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
- {
- _connectionNotTuned.signal();
- }
- }
-
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
- }
-
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
deleted file mode 100644
index 70ea6bd27d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.nclient.amqp.qpid;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.DtxCoordinationCommitBody;
-import org.apache.qpid.framing.DtxCoordinationCommitOkBody;
-import org.apache.qpid.framing.DtxCoordinationForgetBody;
-import org.apache.qpid.framing.DtxCoordinationForgetOkBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody;
-import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareBody;
-import org.apache.qpid.framing.DtxCoordinationPrepareOkBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverBody;
-import org.apache.qpid.framing.DtxCoordinationRecoverOkBody;
-import org.apache.qpid.framing.DtxCoordinationRollbackBody;
-import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody;
-import org.apache.qpid.nclient.amqp.AMQPCallBack;
-import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
-import org.apache.qpid.nclient.amqp.AMQPDtxCoordination;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination
-{
- private Phase _phase;
-
- protected QpidAMQPDtxCoordination(int channelId,Phase phase)
- {
- super(channelId);
- _phase = phase;
- }
-
- public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb);
- _phase.messageSent(msg);
- }
-
- public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb);
- _phase.messageSent(msg);
- }
-
- public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb);
- _phase.messageSent(msg);
- }
-
- public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb);
- _phase.messageSent(msg);
- }
-
- public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb);
- _phase.messageSent(msg);
- }
-
- public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb);
- _phase.messageSent(msg);
- }
-
- public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb);
- _phase.messageSent(msg);
- }
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof DtxCoordinationCommitOkBody ||
- methodBody instanceof DtxCoordinationForgetOkBody ||
- methodBody instanceof DtxCoordinationGetTimeoutOkBody ||
- methodBody instanceof DtxCoordinationPrepareOkBody ||
- methodBody instanceof DtxCoordinationRecoverOkBody
- )
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else
- {
- return false;
- }
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
deleted file mode 100644
index fd7b6f8ec6..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.DtxDemarcationEndBody;
-import org.apache.qpid.framing.DtxDemarcationEndOkBody;
-import org.apache.qpid.framing.DtxDemarcationSelectBody;
-import org.apache.qpid.framing.DtxDemarcationSelectOkBody;
-import org.apache.qpid.framing.DtxDemarcationStartBody;
-import org.apache.qpid.framing.DtxDemarcationStartOkBody;
-import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.amqp.state.AMQPState;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.util.AMQPValidator;
-
-public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation
-{
- private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class);
-
- // the channelId that will be used for transactions
- private int _channelId;
-
- private Phase _phase;
-
- private AMQPState _currentState;
-
- private AMQPStateManager _stateManager;
-
- private final AMQPState[] _validEndStates = new AMQPState[]
- { AMQPState.DTX_STARTED };
-
- private final AMQPState[] _validStartStates = new AMQPState[]
- { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _dtxNotSelected = _lock.newCondition();
-
- private final Condition _dtxNotStarted = _lock.newCondition();
-
- // maybe it needs a better name
- private final Condition _dtxNotEnd = _lock.newCondition();
-
- private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody;
-
- private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody;
-
- private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody;
-
- protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager)
- {
- _channelId = channelId;
- _phase = phase;
- _stateManager = stateManager;
- _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
- public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _dtxDemarcationSelectOkBody = null;
- checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _dtxNotSelected.await();
- AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time");
- notifyState(AMQPState.DTX_NOT_STARTED);
- _currentState = AMQPState.DTX_NOT_STARTED;
- return _dtxDemarcationSelectOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in dtx.select", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _dtxDemarcationStartOkBody = null;
- checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _dtxNotStarted.await();
- AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time");
- notifyState(AMQPState.DTX_STARTED);
- _currentState = AMQPState.DTX_STARTED;
- return _dtxDemarcationStartOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in dtx.start", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _dtxDemarcationEndOkBody = null;
- checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _dtxNotEnd.await();
- AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time");
- notifyState(AMQPState.DTX_END);
- _currentState = AMQPState.DTX_END;
- return _dtxDemarcationEndOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in dtx.start", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQPMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- if (evt.getMethod() instanceof DtxDemarcationSelectOkBody)
- {
- _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
- _dtxNotSelected.signal();
- return true;
- }
- else if (evt.getMethod() instanceof DtxDemarcationStartOkBody)
- {
- _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod();
- _dtxNotStarted.signal();
- return true;
- }
- else if (evt.getMethod() instanceof DtxDemarcationEndOkBody)
- {
- _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod();
- _dtxNotEnd.signal();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void notifyState(AMQPState newState) throws AMQPException
- {
- _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
deleted file mode 100644
index 02b22e0755..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.nclient.amqp.qpid;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.ExchangeDeleteBody;
-import org.apache.qpid.framing.ExchangeDeleteOkBody;
-import org.apache.qpid.nclient.amqp.AMQPCallBack;
-import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
-import org.apache.qpid.nclient.amqp.AMQPExchange;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-/**
- *
- * This class represents the Exchange class defined in AMQP.
- * Each method takes an @see AMQPCallBack object if it wants to know
- * the response from the broker to particular method.
- * Clients can handle the reponse asynchronously or block for a response
- * using AMQPCallBack.isComplete() periodically using a loop.
- */
-public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange
-{
- private Phase _phase;
-
- protected QpidAMQPExchange(int channelId,Phase phase)
- {
- super(channelId);
- _phase = phase;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
- _phase.messageSent(msg);
- }
-
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else
- {
- return false;
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
deleted file mode 100644
index e40c3cefa2..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageConsumeBody;
-import org.apache.qpid.framing.MessageEmptyBody;
-import org.apache.qpid.framing.MessageGetBody;
-import org.apache.qpid.framing.MessageOffsetBody;
-import org.apache.qpid.framing.MessageOkBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageQosBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.nclient.amqp.AMQPCallBack;
-import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
-import org.apache.qpid.nclient.amqp.AMQPMessage;
-import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-/**
- * This class represents the AMQP Message class.
- * You need an instance of this class per channel.
- * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
- * A client can use this class to issue Message class methods on the broker.
- * When the broker issues Message class methods on the client, the client is notified
- * via the AMQPMessageCallBack interface.
- *
- * A JMS Message producer implementation can wrap an instance if this and map
- * JMS method calls to the appropriate AMQP methods.
- *
- * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
- *
- */
-public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage
-{
- private Phase _phase;
- private AMQPMessageCallBack _messageCb;
-
- protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
- {
- super(channelId);
- _phase = phase;
- _messageCb = messageCb;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
-
- public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long)
- */
- public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long)
- */
- public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long)
- */
- public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException
- {
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
- _phase.messageSent(msg);
- }
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof MessageOkBody ||
- methodBody instanceof MessageRejectBody ||
- methodBody instanceof MessageEmptyBody)
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else if (methodBody instanceof MessageTransferBody)
- {
- _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageAppendBody)
- {
- _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageOpenBody)
- {
- _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageCloseBody)
- {
- _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageCheckpointBody)
- {
- _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageRecoverBody)
- {
- _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else if (methodBody instanceof MessageResumeBody)
- {
- _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
- return true;
- }
- else
- {
- return false;
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
deleted file mode 100644
index 323ff0cf06..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.framing.QueueUnbindBody;
-import org.apache.qpid.framing.QueueUnbindOkBody;
-import org.apache.qpid.nclient.amqp.AMQPCallBack;
-import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
-import org.apache.qpid.nclient.amqp.AMQPQueue;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-/**
- *
- * This class represents the Queue class defined in AMQP.
- * Each method takes an @see AMQPCallBack object if it wants to know
- * the response from the broker to a particular method.
- * Clients can handle the reponse asynchronously or block for a response
- * using AMQPCallBack.isComplete() periodically using a loop.
- */
-public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue
-{
- private Phase _phase;
-
- protected QpidAMQPQueue(int channelId,Phase phase)
- {
- super(channelId);
- _phase = phase;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
- _phase.messageSent(msg);
- }
-
- // Queue.unbind doesn't have nowait
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
- _phase.messageSent(msg);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
- */
- public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
- {
- AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
- _phase.messageSent(msg);
- }
-
-
- /**-------------------------------------------
- * AMQPMethodListener methods
- *--------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- long localCorrelationId = evt.getLocalCorrelationId();
- AMQMethodBody methodBody = evt.getMethod();
- if ( methodBody instanceof QueueDeclareOkBody ||
- methodBody instanceof QueueBindOkBody ||
- methodBody instanceof QueueUnbindOkBody ||
- methodBody instanceof QueuePurgeOkBody ||
- methodBody instanceof QueueDeleteOkBody
- )
- {
- invokeCallBack(localCorrelationId,methodBody);
- return true;
- }
- else
- {
- return false;
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
deleted file mode 100644
index 902c80fe77..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
-import org.apache.qpid.nclient.core.AMQPException;
-
-/**
- * This class registeres with the ModelPhase as a AMQMethodListener,
- * to receive method events and then it distributes methods to other listerners
- * using a filtering criteria. The criteria is channel id and method body class.
- * The method listeners are added and removed dynamically
- *
- * <p/>
- */
-public class QpidEventManager implements AMQPEventManager
-{
- private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
-
- private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
-
- /**
- * ------------------------------------------------
- * methods introduced by AMQEventManager
- * ------------------------------------------------
- */
- public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
- {
- Map<Class, List> _methodListenerMap;
- if (_channelMap.containsKey(channelId))
- {
- _methodListenerMap = _channelMap.get(channelId);
-
- }
- else
- {
- _methodListenerMap = new ConcurrentHashMap<Class, List>();
- _channelMap.put(channelId, _methodListenerMap);
- }
-
- List<AMQPMethodListener> _listeners;
- if (_methodListenerMap.containsKey(clazz))
- {
- _listeners = _methodListenerMap.get(clazz);
- }
- else
- {
- _listeners = new ArrayList<AMQPMethodListener>();
- _methodListenerMap.put(clazz, _listeners);
- }
-
- _listeners.add(l);
-
- }
-
- public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
- {
- if (_channelMap.containsKey(channelId))
- {
- Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
-
- if (_methodListenerMap.containsKey(clazz))
- {
- List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
- _listeners.remove(l);
- }
-
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
- */
- public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
- {
- if (_channelMap.containsKey(evt.getChannelId()))
- {
- Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
-
- if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
- {
-
- List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
- for (AMQPMethodListener l : _listeners)
- {
- l.methodReceived(evt);
- }
-
- return (_listeners.size() > 0);
- }
-
- }
-
- return false;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
deleted file mode 100644
index 3fe1ee4cfd..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.qpid;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class QpidStateManager implements AMQPStateManager
-{
-
- private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
-
- private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
-
- public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
- {
- List<AMQPStateListener> list;
- if(_listernerMap.containsKey(stateType))
- {
- list = _listernerMap.get(stateType);
- }
- else
- {
- list = new ArrayList<AMQPStateListener>();
- _listernerMap.put(stateType, list);
- }
- list.add(l);
- }
-
- public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
- {
- if(_listernerMap.containsKey(stateType))
- {
- List<AMQPStateListener> list = _listernerMap.get(stateType);
- list.remove(l);
- }
- }
-
- public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
- {
-
- if(_listernerMap.containsKey(event.getStateType()))
- {
- List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
- for(AMQPStateListener l: list)
- {
- l.stateChanged(event);
- }
- }
- else
- {
- _logger.warn("There are no registered listerners for state type" + event.getStateType());
- }
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
deleted file mode 100644
index 31a0197ab1..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.sample;
-
-import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCheckpointBody;
-import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageOpenBody;
-import org.apache.qpid.framing.MessageRecoverBody;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class MessageHelper implements AMQPMessageCallBack
-{
-
- public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
- {
- System.out.println("The Broker has sent a message" + messageTransferBody.toString());
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
deleted file mode 100644
index 908f0adee0..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.sample;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.security.AMQPCallbackHandler;
-import org.apache.qpid.nclient.security.CallbackHandlerRegistry;
-import org.apache.qpid.nclient.transport.ConnectionURL;
-
-public class SecurityHelper
-{
- public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
- {
- final String mechanisms = new String(availableMechanisms, "utf8");
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
- HashSet mechanismSet = new HashSet();
- while (tokenizer.hasMoreTokens())
- {
- mechanismSet.add(tokenizer.nextToken());
- }
-
- String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
- StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
- while (prefTokenizer.hasMoreTokens())
- {
- String mech = prefTokenizer.nextToken();
- if (mechanismSet.contains(mech))
- {
- return mech;
- }
- }
- return null;
- }
-
- public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url)
- throws AMQPException
- {
- Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
- try
- {
- Object instance = mechanismClass.newInstance();
- AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance;
- cbh.initialise(url);
- return cbh;
- }
- catch (Exception e)
- {
- throw new AMQPException("Unable to create callback handler: " + e, e);
- }
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
deleted file mode 100644
index 04714d7278..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.sample;
-
-import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
-import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class StateHelper implements AMQPStateListener
-{
-
- public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
- {
- String s = event.getStateType() + " changed state from " +
- event.getOldState() + " to " + event.getNewState();
-
- System.out.println(s);
-
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
deleted file mode 100644
index 3dce1cde1e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.sample;
-
-import java.util.StringTokenizer;
-import java.util.UUID;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.Content;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.MessageCancelBody;
-import org.apache.qpid.framing.MessageConsumeBody;
-import org.apache.qpid.framing.MessageGetBody;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.QueuePurgeBody;
-import org.apache.qpid.framing.QueuePurgeOkBody;
-import org.apache.qpid.nclient.amqp.AMQPCallBack;
-import org.apache.qpid.nclient.amqp.AMQPChannel;
-import org.apache.qpid.nclient.amqp.AMQPClassFactory;
-import org.apache.qpid.nclient.amqp.AMQPConnection;
-import org.apache.qpid.nclient.amqp.AMQPExchange;
-import org.apache.qpid.nclient.amqp.AMQPMessage;
-import org.apache.qpid.nclient.amqp.AMQPQueue;
-import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory;
-import org.apache.qpid.nclient.amqp.state.AMQPStateType;
-import org.apache.qpid.nclient.transport.AMQPConnectionURL;
-import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
-
-/**
- * This class illustrates the usage of the API
- * Notes this is just a simple demo.
- *
- * I have used Helper classes to keep the code cleaner.
- * Will break this into unit tests later on
- */
-
-@SuppressWarnings("unused")
-public class TestClient
-{
- private byte _major;
-
- private byte _minor;
-
- private ConnectionURL _url;
-
- private static int _channel = 2;
-
- // Need a Class factory per connection
- private AMQPClassFactory _classFactory;
-
- private int _ticket;
-
- public AMQPConnection openConnection() throws Exception
- {
- _classFactory = AbstractAMQPClassFactory.getFactoryInstance();
-
- //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
-
- _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
- return _classFactory.createConnectionClass(_url, ConnectionType.TCP);
- }
-
- public void handleConnectionNegotiation(AMQPConnection con) throws Exception
- {
- StateHelper stateHelper = new StateHelper();
- _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper);
- _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper);
-
- //ConnectionStartBody
- ConnectionStartBody connectionStartBody = con.openTCPConnection();
- _major = connectionStartBody.getMajor();
- _minor = connectionStartBody.getMinor();
-
- FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
-
- final String locales = new String(connectionStartBody.getLocales(), "utf8");
- final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
-
- final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
-
- SaslClient sc = Sasl.createSaslClient(new String[]
- { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
-
- ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
- tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
- // ConnectionSecureBody
- AMQMethodBody body = con.startOk(connectionStartOkBody);
- ConnectionTuneBody connectionTuneBody;
-
- if (body instanceof ConnectionSecureBody)
- {
- ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
- ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
- .evaluateChallenge(connectionSecureBody.getChallenge()));
- //Assuming the server is not going to send another challenge
- connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody);
-
- }
- else
- {
- connectionTuneBody = (ConnectionTuneBody) body;
- }
-
- // Using broker supplied values
- ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
- connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
- con.tuneOk(connectionTuneOkBody);
-
- ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
- .getVirtualHost()));
-
- ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
- }
-
- public void handleChannelNegotiation() throws Exception
- {
- AMQPChannel channel = _classFactory.createChannelClass(_channel);
-
- ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
- ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
-
- //lets have some fun
- ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
-
- ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
-
- channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
- channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
- }
-
- public void createExchange() throws Exception
- {
- AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
-
- ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments
- false,//auto delete
- false,// durable
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal
- false,// nowait
- false,// passive
- _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
- exchange.declare(exchangeDeclareBody, cb);
- // Blocking for response
- while (!cb.isComplete())
- {
- }
- }
-
- public void createAndBindQueue() throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments
- false,//auto delete
- false,// durable
- false, //exclusive,
- false, //nowait,
- false, //passive,
- new AMQShortString("MyTestQueue"), 0);
-
- AMQPCallBack cb = new AMQPCallBack()
- {
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body;
- System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count "
- + queueDeclareOkBody.getConsumerCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.declare(queueDeclareBody, cb);
- //Blocking for response
- while (!cb.isComplete())
- {
- }
-
- QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- new AMQShortString("RH"), //routingKey
- 0 //ticket
- );
-
- cb = createCallBackWithMessage("Broker has bound the queue");
- queue.bind(queueBindBody, cb);
- //Blocking for response
- while (!cb.isComplete())
- {
- }
- }
-
- public void purgeQueue() throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack()
- {
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body;
- System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.purge(queuePurgeBody, cb);
- //Blocking for response
- while (!cb.isComplete())
- {
- }
-
- }
-
- public void deleteQueue() throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty
- false, //ifUnused
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack()
- {
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body;
- System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.delete(queueDeleteBody, cb);
- //Blocking for response
- while (!cb.isComplete())
- {
- }
-
- }
-
- public void publishAndSubscribe() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
- MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination
- false, //exclusive
- null, //filter
- false, //noAck,
- false, //noLocal,
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
- message.consume(messageConsumeBody, cb);
- //Blocking for response
- while (!cb.isComplete())
- {
- }
-
- // Sending 5 messages serially
- for (int i = 0; i < 5; i++)
- {
- cb = createCallBackWithMessage("Broker has accepted msg " + i);
- message.transfer(createMessages("Test" + i), cb);
- while (!cb.isComplete())
- {
- }
- }
-
- MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
-
- AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
- message.cancel(messageCancelBody, cb2);
-
- }
-
- private MessageTransferBody createMessages(String content) throws Exception
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
-
- MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId
- headers, //applicationHeaders
- new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body
- new AMQShortString(""), //contentEncoding,
- new AMQShortString("text/plain"), //contentType
- new AMQShortString("testApp"), //correlationId
- (short) 1, //deliveryMode non persistant
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
- 0l, //expiration
- false, //immediate
- false, //mandatory
- new AMQShortString(UUID.randomUUID().toString()), //messageId
- (short) 0, //priority
- false, //redelivered
- new AMQShortString("RH"), //replyTo
- new AMQShortString("RH"), //routingKey,
- "abc".getBytes(), //securityToken
- 0, //ticket
- System.currentTimeMillis(), //timestamp
- new AMQShortString(""), //transactionId
- 0l, //ttl,
- new AMQShortString("Hello") //userId
- );
-
- return messageTransferBody;
-
- }
-
- public void publishAndGet() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
-
- MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
- message.transfer(createMessages("Test"), cb);
- while (!cb.isComplete())
- {
- }
-
- cb = createCallBackWithMessage("Broker has accepted get");
- message.get(messageGetBody, cb);
- }
-
- // Creates a gneric call back and prints the given message
- private AMQPCallBack createCallBackWithMessage(final String msg)
- {
- AMQPCallBack cb = new AMQPCallBack()
- {
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- System.out.println(msg);
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- return cb;
- }
-
- public static void main(String[] args)
- {
- TestClient test = new TestClient();
- try
- {
- AMQPConnection con = test.openConnection();
- test.handleConnectionNegotiation(con);
- test.handleChannelNegotiation();
- test.createExchange();
- test.createAndBindQueue();
- test.publishAndSubscribe();
- test.purgeQueue();
- test.publishAndGet();
- test.deleteQueue();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
deleted file mode 100644
index 18219abc46..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.state;
-
-/**
- * States used in the AMQ protocol. Used by the finite state machine to determine
- * valid responses.
- */
-public class AMQPState
-{
- private final int _id;
-
- private final String _name;
-
- private AMQPState(int id, String name)
- {
- _id = id;
- _name = name;
- }
-
- public String toString()
- {
- //return "AMQState: id = " + _id + " name: " + _name;
- return _name; // looks better with loggin
- }
-
- // Connection state
- public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED");
- public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED");
- public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE");
- public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED");
- public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED");
- public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN");
- public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING");
- public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED");
-
- // Channel state
- public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED");
- public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED");
- public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED");
- public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND");
-
- // Distributed Transaction state
- public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED");
- public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED");
- public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED");
- public static final AMQPState DTX_END = new AMQPState(10, "DTX_END");
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
deleted file mode 100644
index 506d267a9e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.state;
-
-public class AMQPStateChangedEvent
-{
- private AMQPState _oldState;
-
- private AMQPState _newState;
-
- private AMQPStateType _stateType;
-
- public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType)
- {
- _oldState = oldState;
- _newState = newState;
- _stateType = stateType;
- }
-
- public AMQPState getNewState()
- {
- return _newState;
- }
-
- public void setNewState(AMQPState newState)
- {
- this._newState = newState;
- }
-
- public AMQPState getOldState()
- {
- return _oldState;
- }
-
- public void setOldState(AMQPState oldState)
- {
- this._oldState = oldState;
- }
-
- public AMQPStateType getStateType()
- {
- return _stateType;
- }
-
- public void setStateType(AMQPStateType stateType)
- {
- this._stateType = stateType;
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
deleted file mode 100644
index 974f707504..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.qpid.nclient.amqp.state;
-
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPStateListener
-{
- public void stateChanged(AMQPStateChangedEvent event) throws AMQPException;
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
deleted file mode 100644
index c1fde7181d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.qpid.nclient.amqp.state;
-
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class AMQPStateMachine
-{
- protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
- {
- if (currentState != correctState)
- {
- throw new IllegalStateTransitionException(currentState,requiredState);
- }
- }
-
- protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException
- {
- for(AMQPState correctState :correctStates)
- {
- if (currentState == correctState)
- {
- return;
- }
- }
- throw new IllegalStateTransitionException(currentState,requiredState);
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
deleted file mode 100644
index 9bc60b658e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.qpid.nclient.amqp.state;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.nclient.core.AMQPException;
-
-public interface AMQPStateManager
-{
- public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
-
- public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
-
- public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
deleted file mode 100644
index e190d639f2..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.state;
-
-/**
- * The Type of States used in the AMQ protocol.
- * This allows to partition listeners by the type of states they want
- * to listen rather than all.
- * For example an Object might only be interested in Channel state
- */
-public class AMQPStateType
-{
- private final int _typeId;
-
- private final String _typeName;
-
- private AMQPStateType(int id, String name)
- {
- _typeId = id;
- _typeName = name;
- }
-
- public String toString()
- {
- return _typeName;
- }
-
- // Connection state
- public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE");
- public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE");
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
deleted file mode 100644
index fdc24d1d2f..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.amqp.state;
-
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class IllegalStateTransitionException extends AMQPException
-{
- private AMQPState _currentState;
- private AMQPState _desiredState;
-
- public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState)
- {
- super("No valid state transition defined from state " + currentState +
- " to state " + desiredState);
- _currentState = currentState;
- _desiredState = desiredState;
- }
-
- public AMQPState getCurrentState()
- {
- return _currentState;
- }
-
- public AMQPState getDesiredState()
- {
- return _desiredState;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
deleted file mode 100644
index 7bc77e02c0..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.qpid.nclient.config;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.TreeMap;
-
-import javax.security.sasl.SaslClientFactory;
-
-import org.apache.commons.configuration.CombinedConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.SystemConfiguration;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.security.AMQPCallbackHandler;
-
-/**
- * Loads a properties file from classpath.
- * These values can be overwritten using system properties
- */
-public class ClientConfiguration extends CombinedConfiguration {
-
- private static final Logger _logger = Logger.getLogger(ClientConfiguration.class);
- private static ClientConfiguration _instance = new ClientConfiguration();
-
- ClientConfiguration()
- {
- super();
- addConfiguration(new SystemConfiguration());
- try
- {
- XMLConfiguration config = new XMLConfiguration();
- config.load(getInputStream());
- addConfiguration(config);
- }
- catch (ConfigurationException e)
- {
- _logger.warn("Client Properties missing, using defaults",e);
- }
- }
-
- public static ClientConfiguration get()
- {
- return _instance;
- }
-
- private InputStream getInputStream()
- {
- if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null)
- {
- try
- {
- return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH));
- }
- catch(Exception e)
- {
- return this.getClass().getResourceAsStream("client.xml");
- }
- }
- else
- {
- return this.getClass().getResourceAsStream("client.xml");
- }
-
- }
-
- public static void main(String[] args)
- {
- String key = QpidConstants.AMQP_SECURITY + "." +
- QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
- QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
-
- TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
- new TreeMap<String, Class<? extends SaslClientFactory>>();
-
- int index = ClientConfiguration.get().getMaxIndex(key);
-
- for (int i=0; i<index+1;i++)
- {
- String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
- String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
- try
- {
- Class<?> clazz = Class.forName(className);
- if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
- {
- _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
- continue;
- }
- factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
- }
- catch (Exception ex)
- {
- _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
- }
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
deleted file mode 100644
index 6eeedbe1ff..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="ISO-8859-1" ?>
-<qpidClientConfig>
-
- <security>
- <saslClientFactoryTypes>
- <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
- </saslClientFactoryTypes>
- <securityMechanisms>
- <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- </securityMechanisms>
- </security>
-
- <!-- Transport Layer properties -->
- <useSharedReadWritePool>false</useSharedReadWritePool>
- <enableDirectBuffers>true</enableDirectBuffers>
- <enablePooledAllocator>false</enablePooledAllocator>
- <tcpNoDelay>true</tcpNoDelay>
- <sendBufferSizeInKb>32</sendBufferSizeInKb>
- <reciveBufferSizeInKb>32</reciveBufferSizeInKb>
- <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
-
- <!-- Execution Layer properties -->
- <maxAccumilatedResponses>20</maxAccumilatedResponses>
-
- <!-- Model Phase properties -->
- <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds>
- <amqpClassFactory>org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory</amqpClassFactory>
- <maxAccumilatedResponses>20</maxAccumilatedResponses>
-
- <phasePipe>
- <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase>
- <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase>
- <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase>
- </phasePipe>
-
-</qpidClientConfig> \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
deleted file mode 100644
index a029f7d4ff..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-
-
-public class AMQPException extends Exception
-{
- private int _errorCode;
-
- public AMQPException(String message)
- {
- super(message);
- }
-
- public AMQPException(String msg, Throwable t)
- {
- super(msg, t);
- }
-
- public AMQPException(int errorCode, String msg, Throwable t)
- {
- super(msg + " [error code " + errorCode + ']', t);
- _errorCode = errorCode;
- }
-
- public AMQPException(int errorCode, String msg)
- {
- super(msg + " [error code " + errorCode + ']');
- _errorCode = errorCode;
- }
-
- public AMQPException(Logger logger, String msg, Throwable t)
- {
- this(msg, t);
- logger.error(getMessage(), this);
- }
-
- public AMQPException(Logger logger, String msg)
- {
- this(msg);
- logger.error(getMessage(), this);
- }
-
- public AMQPException(Logger logger, int errorCode, String msg)
- {
- this(errorCode, msg);
- logger.error(getMessage(), this);
- }
-
- public int getErrorCode()
- {
- return _errorCode;
- }
- }
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
deleted file mode 100644
index bf6a19b920..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.core;
-
-public abstract class AbstractPhase implements Phase {
-
- protected PhaseContext _ctx;
- protected Phase _nextInFlowPhase;
- protected Phase _nextOutFlowPhase;
-
-
- /**
- * ------------------------------------------------
- * Phase - method introduced by Phase
- * ------------------------------------------------
- */
- public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) {
- _nextInFlowPhase = nextInFlowPhase;
- _nextOutFlowPhase = nextOutFlowPhase;
- _ctx = ctx;
- }
-
- /**
- * The start is called from the top
- * of the pipe and is propogated the
- * bottom.
- *
- * Each phase can override this to do
- * any phase specific logic related
- * pipe.start()
- */
- public void start()throws AMQPException
- {
- if(_nextOutFlowPhase != null)
- {
- _nextOutFlowPhase.start();
- }
- }
-
- /**
- * Each phase can override this to do
- * any phase specific cleanup
- */
- public void close()throws AMQPException
- {
-
- }
-
- public void messageReceived(Object frame) throws AMQPException
- {
- if(_nextInFlowPhase != null)
- {
- _nextInFlowPhase.messageReceived(frame);
- }
- }
-
- public void messageSent(Object frame) throws AMQPException
- {
- if (_nextOutFlowPhase != null)
- {
- _nextOutFlowPhase.messageSent(frame);
- }
- }
-
- public PhaseContext getPhaseContext()
- {
- return _ctx;
- }
-
- public Phase getNextInFlowPhase() {
- return _nextInFlowPhase;
- }
-
- public Phase getNextOutFlowPhase() {
- return _nextOutFlowPhase;
- }
-
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
deleted file mode 100644
index a3455cdacd..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DefaultPhaseContext implements PhaseContext
-{
- public Map<String,Object> _props = new ConcurrentHashMap<String,Object>();
-
- public Object getProperty(String name)
- {
- return _props.get(name);
- }
-
- public void setProperty(String name, Object value)
- {
- _props.put(name, value);
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
deleted file mode 100644
index 7aa10b77ff..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-
-public interface Phase
-{
-
- /**
- * This method is used to initialize a phase
- *
- * @param ctx
- * @param nextInFlowPhase
- * @param nextOutFlowPhase
- */
- public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase);
-
- /**
- *
- * Implement logic related to physical opening
- * of the pipe
- */
- public void start()throws AMQPException;
-
- /**
- * Implement cleanup in this method.
- * This indicates the pipe is closing
- */
- public void close()throws AMQPException;
-
- public void messageReceived(Object msg) throws AMQPException;
-
- public void messageSent(Object msg) throws AMQPException;
-
- public PhaseContext getPhaseContext();
-
- public Phase getNextOutFlowPhase();
-
- public Phase getNextInFlowPhase();
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
deleted file mode 100644
index d5942fd785..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.core;
-
-/**
- * This can be thought of as a session context associated
- * with the pipe. This is transient and is scoped by the
- * duration of the physical connection.
- *
- */
-public interface PhaseContext {
-
- public Object getProperty(String name);
-
- public void setProperty(String name, Object value);
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
deleted file mode 100644
index 9542aab344..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.nclient.config.ClientConfiguration;
-
-public class PhaseFactory
-{
- /**
- * This method will create the pipe and return a reference
- * to the top of the pipeline.
- *
- * The application can then use this (top most) phase and all
- * calls will propogated down the pipe.
- *
- * Simillar calls orginating at the bottom of the pipeline
- * will be propogated to the top.
- *
- * @param ctx
- * @return
- * @throws AMQPException
- */
- public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
- {
- String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
- Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
- List<String> list = ClientConfiguration.get().getList(key);
- int index = 0;
- for(String s:list)
- {
- try
- {
- Phase temp = (Phase)Class.forName(s).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ;
- }
- catch(Exception e)
- {
- throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
- }
- index++;
- }
-
- Phase current = null;
- Phase prev = null;
- Phase next = null;
- //Lets build the phase pipe.
- for (int i=0; i<phaseMap.size();i++)
- {
- current = phaseMap.get(i);
- if (i+1 < phaseMap.size())
- {
- next = phaseMap.get(i+1);
- }
- current.init(ctx, next, prev);
- prev = current;
- next = null;
- }
-
- return current;
- }
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
deleted file mode 100644
index 034fc28070..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-public interface QpidConstants
-{
-
- // Common properties
- public static long EMPTY_CORRELATION_ID = -1;
-
- public static int CHANNEL_ZERO = 0;
-
- public static String CONFIG_FILE_PATH = "ConfigFilePath";
-
- // Phase Context properties
- public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
-
- public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
-
- public final static String EVENT_MANAGER = "EVENT_MANAGER";
-
- /**---------------------------------------------------------------
- * Configuration file properties
- * ------------------------------------------------------------
- */
-
- // Model Layer properties
- public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
- public final static String AMQP_CLASS_FACTORY = "amqpClassFactory";
-
- // MINA properties
- public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool";
-
- public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers";
-
- public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator";
-
- public final static String TCP_NO_DELAY = "tcpNoDelay";
-
- public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb";
-
- public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb";
-
- // Security properties
- public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes";
-
- public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
-
- public final static String TYPE = "[@type]";
-
- public final static String AMQP_SECURITY = "security";
-
- public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
-
- public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
-
- // Execution Layer properties
- public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses";
-
- //Transport Layer properties
- public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass";
-
- //Phase pipe properties
- public final static String PHASE_PIPE = "phasePipe";
-
- public final static String PHASE = "phase";
-
- public final static String INDEX = "[@index]";
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
deleted file mode 100644
index cc1503d414..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.qpid.nclient.core;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class TransientPhaseContext implements PhaseContext {
-
- private Map map = new ConcurrentHashMap();
-
- public Object getProperty(String name) {
- return map.get(name);
- }
-
- public void setProperty(String name, Object value) {
- map.put(name, value);
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
deleted file mode 100644
index 1305500439..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package org.apache.qpid.nclient.execution;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQRequestBody;
-import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-/**
- * Corressponds to the Layer 2 in AMQP.
- * This phase handles the correlation of amqp messages
- * This class implements the 0.9 spec (request/response)
- */
-public class ExecutionPhase extends AbstractPhase
-{
-
- protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
-
- protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
-
- protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
-
- /**
- * --------------------------------------------------
- * Phase related methods
- * --------------------------------------------------
- */
-
- // should add these in the init method
- //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
- //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
- public void messageReceived(Object msg) throws AMQPException
- {
- AMQFrame frame = (AMQFrame) msg;
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- if (bodyFrame instanceof AMQRequestBody)
- {
- AMQPMethodEvent event;
- try
- {
- event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
- super.messageReceived(event);
- }
- catch (Exception e)
- {
- _logger.error("Error handling request", e);
- }
-
- }
- else if (bodyFrame instanceof AMQResponseBody)
- {
- List<AMQPMethodEvent> events;
- try
- {
- events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
- for (AMQPMethodEvent event : events)
- {
- super.messageReceived(event);
- }
- }
- catch (Exception e)
- {
- _logger.error("Error handling response", e);
- }
- }
- }
-
- /**
- * Need to figure out if the message is a request or a response
- * that needs to be sent and then delegate it to the Request or response manager
- * to prepare it.
- */
- public void messageSent(Object msg) throws AMQPException
- {
- AMQPMethodEvent evt = (AMQPMethodEvent) msg;
- if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
- {
- // This is a request
- AMQFrame frame = handleRequest(evt);
- super.messageSent(frame);
- }
- else
- {
- // This is a response
- List<AMQFrame> frames = handleResponse(evt);
- for (AMQFrame frame : frames)
- {
- super.messageSent(frame);
- }
- }
- }
-
- /**
- * ------------------------------------------------
- * Methods to handle request response
- * -----------------------------------------------
- */
- private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Request frame received: " + requestBody);
- }
-
- ResponseManager responseManager;
- if(_channelId2ResponseMgrMap.containsKey(channelId))
- {
- responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
- }
- else
- {
- responseManager = new ResponseManager(0,channelId,false);
- _channelId2ResponseMgrMap.put(channelId, responseManager);
- }
- return responseManager.requestReceived(requestBody);
- }
-
- private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
- throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Response frame received: " + responseBody);
- }
-
- RequestManager requestManager;
- if (_channelId2RequestMgrMap.containsKey(channelId))
- {
- requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
- }
- else
- {
- requestManager = new RequestManager(0,channelId,false);
- _channelId2RequestMgrMap.put(channelId, requestManager);
- }
-
- return requestManager.responseReceived(responseBody);
- }
-
- private AMQFrame handleRequest(AMQPMethodEvent evt)
- {
- int channelId = evt.getChannelId();
- RequestManager requestManager;
- if (_channelId2RequestMgrMap.containsKey(channelId))
- {
- requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
- }
- else
- {
- requestManager = new RequestManager(0,channelId,false);
- _channelId2RequestMgrMap.put(channelId, requestManager);
- }
- return requestManager.sendRequest(evt);
- }
-
- private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
- {
- int channelId = evt.getChannelId();
- ResponseManager responseManager;
- if(_channelId2ResponseMgrMap.containsKey(channelId))
- {
- responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
- }
- else
- {
- responseManager = new ResponseManager(0,channelId,false);
- _channelId2ResponseMgrMap.put(channelId, responseManager);
- }
- try
- {
- return responseManager.sendResponse(evt);
- }
- catch (Exception e)
- {
- throw new AMQPException("Error handling response", e);
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
deleted file mode 100644
index 0084f27717..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.execution;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQRequestBody;
-import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-
-public class RequestManager
-{
- private static final Logger logger = Logger.getLogger(RequestManager.class);
-
- private int channel;
-
- /**
- * Used for logging and debugging only - allows the context of this instance
- * to be known.
- */
- private boolean serverFlag;
- private long connectionId;
-
- /**
- * Request and response frames must have a requestID and responseID which
- * indepenedently increment from 0 on a per-channel basis. These are the
- * counters, and contain the value of the next (not yet used) frame.
- */
- private long requestIdCount;
-
- /**
- * These keep track of the last requestId and responseId to be received.
- */
- private long lastProcessedResponseId;
-
- private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
-
- public RequestManager(long connectionId, int channel, boolean serverFlag)
- {
- this.channel = channel;
- this.serverFlag = serverFlag;
- this.connectionId = connectionId;
- requestIdCount = 1L;
- lastProcessedResponseId = 0L;
- requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
- }
-
- // *** Functions to originate a request ***
-
- public AMQFrame sendRequest(AMQPMethodEvent evt)
- {
- long requestId = getNextRequestId(); // Get new request ID
- AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
- lastProcessedResponseId, evt.getMethod());
- if (logger.isDebugEnabled())
- {
- logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
- "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
- }
- requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
- return requestFrame;
- }
-
- public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody)
- throws Exception
- {
- long requestIdStart = responseBody.getRequestId();
- long requestIdStop = requestIdStart + responseBody.getBatchOffset();
- if (logger.isDebugEnabled())
- {
- logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
- responseBody + "; " + responseBody.getMethodPayload());
- }
-
- List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>();
- for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
- {
- if (requestSentMap.get(requestId) == null)
- {
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in requestSentMap.");
- }
- CorrelationID correlationID = requestSentMap.get(requestId);
- AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
- correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
- events.add(methodEvent);
- requestSentMap.remove(requestId);
- }
- lastProcessedResponseId = responseBody.getResponseId();
- return events;
- }
-
- // *** Management functions ***
-
- public int requestsMapSize()
- {
- return requestSentMap.size();
- }
-
- // *** Private helper functions ***
-
- private long getNextRequestId()
- {
- return requestIdCount++;
- }
-
- private class CorrelationID
- {
- // Use for the request/response stuff
- private long _systemCorrelationID;
- // used internally to track callbacks
- private long _localCorrelationID;
-
- CorrelationID(long systemCorrelationID,long localCorrelationID)
- {
- _localCorrelationID = localCorrelationID;
- _systemCorrelationID = systemCorrelationID;
- }
-
- public long getLocalCorrelationID()
- {
- return _localCorrelationID;
- }
-
- public void setLocalCorrelationID(long correlationID)
- {
- _localCorrelationID = correlationID;
- }
-
- public long getSystemCorrelationID()
- {
- return _systemCorrelationID;
- }
-
- public void setSystemCorrelationID(long correlationID)
- {
- _systemCorrelationID = correlationID;
- }
-
-
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
deleted file mode 100644
index c5a75d242f..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.execution;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQRequestBody;
-import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-public class ResponseManager
-{
- private static final Logger logger = Logger.getLogger(ResponseManager.class);
-
- private int channel;
-
- /**
- * Used for logging and debugging only - allows the context of this instance
- * to be known.
- */
- private boolean serverFlag;
- private long connectionId;
-
- private int maxAccumulatedResponses = 20; // Default
-
- /**
- * Request and response frames must have a requestID and responseID which
- * indepenedently increment from 0 on a per-channel basis. These are the
- * counters, and contain the value of the next (not yet used) frame.
- */
- private long responseIdCount;
-
- /**
- * These keep track of the last requestId and responseId to be received.
- */
- private long lastReceivedRequestId;
-
- /**
- * Last requestID sent in a response (for batching)
- */
- private long lastSentRequestId;
-
- private class ResponseStatus implements Comparable<ResponseStatus>
- {
- private long requestId;
- private AMQMethodBody responseMethodBody;
-
- public ResponseStatus(long requestId)
- {
- this.requestId = requestId;
- responseMethodBody = null;
- }
-
- public int compareTo(ResponseStatus o)
- {
- return (int)(requestId - o.requestId);
- }
-
- public String toString()
- {
- // Need to define this
- return "";
- }
- }
-
- private ConcurrentHashMap<Long, ResponseStatus> responseMap;
-
- public ResponseManager(long connectionId, int channel, boolean serverFlag)
- {
- this.channel = channel;
- this.serverFlag = serverFlag;
- this.connectionId = connectionId;
- responseIdCount = 1L;
- lastReceivedRequestId = 0L;
- maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES);
- responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
- }
-
- // *** Functions to handle an incoming request ***
-
- public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception
- {
- long requestId = requestBody.getRequestId();
- if (logger.isDebugEnabled())
- {
- logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
- requestBody + "; " + requestBody.getMethodPayload());
- }
- long responseMark = requestBody.getResponseMark();
- lastReceivedRequestId = requestId;
- responseMap.put(requestId, new ResponseStatus(requestId));
-
- AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel,
- requestBody.getMethodPayload(), requestId);
-
- return methodEvent;
- }
-
- public List<AMQFrame> sendResponse(AMQPMethodEvent evt)
- throws RequestResponseMappingException
- {
- long requestId = evt.getCorrelationId();
- AMQMethodBody responseMethodBody = evt.getMethod();
-
- if (logger.isDebugEnabled())
- {
- logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
- "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
- }
-
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus == null)
- {
- throw new RequestResponseMappingException(requestId,
- "Failed to locate requestId " + requestId + " in responseMap." + responseMap);
- }
- if (responseStatus.responseMethodBody != null)
- {
- throw new RequestResponseMappingException(requestId, "RequestId " +
- requestId + " already has a response in responseMap.");
- }
- responseStatus.responseMethodBody = responseMethodBody;
- return doBatches();
- }
-
- // *** Management functions ***
-
- /**
- * Sends batched responses - i.e. all those members of responseMap that have
- * received a response.
- */
- public synchronized List<AMQFrame> doBatches()
- {
- long startRequestId = 0;
- int numAdditionalRequestIds = 0;
- Class responseMethodBodyClass = null;
- List<AMQFrame> frames = new ArrayList<AMQFrame>();
- Iterator<Long> lItr = responseMap.keySet().iterator();
- while (lItr.hasNext())
- {
- long requestId = lItr.next();
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus.responseMethodBody != null)
- {
- frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody));
- lItr.remove();
- }
- }
-
- return frames;
- }
-
- /**
- * Total number of entries in the responseMap - including both those that
- * are outstanding (i.e. no response has been received) and those that are
- * batched (those for which responses have been received but have not yet
- * been collected together and sent).
- */
- public int responsesMapSize()
- {
- return responseMap.size();
- }
-
- /**
- * As the responseMap may contain both outstanding responses (those with
- * ResponseStatus.responseMethodBody still null) and responses waiting to
- * be batched (those with ResponseStatus.responseMethodBody not null), we
- * need to count only those in the map with responseMethodBody null.
- */
- public int outstandingResponses()
- {
- int cnt = 0;
- for (Long requestId : responseMap.keySet())
- {
- if (responseMap.get(requestId).responseMethodBody == null)
- cnt++;
- }
- return cnt;
- }
-
- /**
- * As the responseMap may contain both outstanding responses (those with
- * ResponseStatus.responseMethodBody still null) and responses waiting to
- * be batched (those with ResponseStatus.responseMethodBody not null), we
- * need to count only those in the map with responseMethodBody not null.
- */
- public int batchedResponses()
- {
- int cnt = 0;
- for (Long requestId : responseMap.keySet())
- {
- if (responseMap.get(requestId).responseMethodBody != null)
- cnt++;
- }
- return cnt;
- }
-
- // *** Private helper functions ***
-
- private long getNextResponseId()
- {
- return responseIdCount++;
- }
-
- private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
- AMQMethodBody responseMethodBody)
- {
- long responseId = getNextResponseId(); // Get new response ID
- AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
- firstRequestId, numAdditionalRequests, responseMethodBody);
- return responseFrame;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
deleted file mode 100644
index d79525c5b2..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.nclient.message;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public class AMQPApplicationMessage {
-
- private int bytesReceived = 0;
- private int channelId;
- private byte[] referenceId;
- private List<byte[]> contents = new LinkedList<byte[]>();
- private long deliveryTag;
- private boolean redeliveredFlag;
- private MessageHeaders messageHeaders;
-
- public AMQPApplicationMessage(int channelId, byte[] referenceId)
- {
- this.channelId = channelId;
- this.referenceId = referenceId;
- }
-
- public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
- {
- this.channelId = channelId;
- this.deliveryTag = deliveryTag;
- this.messageHeaders = messageHeaders;
- this.redeliveredFlag = redeliveredFlag;
- }
-
- public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
- {
- this.channelId = channelId;
- this.deliveryTag = deliveryTag;
- this.messageHeaders = messageHeaders;
- this.redeliveredFlag = redeliveredFlag;
- addContent(content);
- }
-
- public void addContent(byte[] content)
- {
- contents.add(content);
- bytesReceived += content.length;
- }
-
- public int getBytesReceived()
- {
- return bytesReceived;
- }
-
- public int getChannelId()
- {
- return channelId;
- }
-
- public byte[] getReferenceId()
- {
- return referenceId;
- }
-
- public List<byte[]> getContents()
- {
- return contents;
- }
-
- public long getDeliveryTag()
- {
- return deliveryTag;
- }
-
- public boolean getRedeliveredFlag()
- {
- return redeliveredFlag;
- }
-
- public MessageHeaders getMessageHeaders()
- {
- return messageHeaders;
- }
-
- public String toString()
- {
- return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" +
- deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" +
- new String(contents.get(0));
- }
-
- public void setDeliveryTag(long deliveryTag)
- {
- this.deliveryTag = deliveryTag;
- }
-
- public void setMessageHeaders(MessageHeaders messageHeaders)
- {
- this.messageHeaders = messageHeaders;
- }
-
- public void setRedeliveredFlag(boolean redeliveredFlag)
- {
- this.redeliveredFlag = redeliveredFlag;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
deleted file mode 100644
index 562aa7b06e..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-
-package org.apache.qpid.nclient.message;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import java.util.Enumeration;
-
-public class MessageHeaders
-{
- private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
-
- private AMQShortString _contentType;
-
- private AMQShortString _encoding;
-
- private AMQShortString _destination;
-
- private AMQShortString _exchange;
-
- private FieldTable _jmsHeaders;
-
- private short _deliveryMode;
-
- private short _priority;
-
- private AMQShortString _correlationId;
-
- private AMQShortString _replyTo;
-
- private long _expiration;
-
- private AMQShortString _messageId;
-
- private long _timestamp;
-
- private AMQShortString _type;
-
- private AMQShortString _userId;
-
- private AMQShortString _appId;
-
- private AMQShortString _transactionId;
-
- private AMQShortString _routingKey;
-
- private int _size;
-
- public int getSize()
- {
- return _size;
- }
-
- public void setSize(int size)
- {
- this._size = size;
- }
-
- public MessageHeaders()
- {
- }
-
- public AMQShortString getContentType()
- {
- return _contentType;
- }
-
- public void setContentType(AMQShortString contentType)
- {
- _contentType = contentType;
- }
-
- public AMQShortString getEncoding()
- {
- return _encoding;
- }
-
- public void setEncoding(AMQShortString encoding)
- {
- _encoding = encoding;
- }
-
- public FieldTable getJMSHeaders()
- {
- if (_jmsHeaders == null)
- {
- setJMSHeaders(FieldTableFactory.newFieldTable());
- }
-
- return _jmsHeaders;
- }
-
- public void setJMSHeaders(FieldTable headers)
- {
- _jmsHeaders = headers;
- }
-
-
- public short getDeliveryMode()
- {
- return _deliveryMode;
- }
-
- public void setDeliveryMode(short deliveryMode)
- {
- _deliveryMode = deliveryMode;
- }
-
- public short getPriority()
- {
- return _priority;
- }
-
- public void setPriority(short priority)
- {
- _priority = priority;
- }
-
- public AMQShortString getCorrelationId()
- {
- return _correlationId;
- }
-
- public void setCorrelationId(AMQShortString correlationId)
- {
- _correlationId = correlationId;
- }
-
- public AMQShortString getReplyTo()
- {
- return _replyTo;
- }
-
- public void setReplyTo(AMQShortString replyTo)
- {
- _replyTo = replyTo;
- }
-
- public long getExpiration()
- {
- return _expiration;
- }
-
- public void setExpiration(long expiration)
- {
- _expiration = expiration;
- }
-
-
- public AMQShortString getMessageId()
- {
- return _messageId;
- }
-
- public void setMessageId(AMQShortString messageId)
- {
- _messageId = messageId;
- }
-
- public long getTimestamp()
- {
- return _timestamp;
- }
-
- public void setTimestamp(long timestamp)
- {
- _timestamp = timestamp;
- }
-
- public AMQShortString getType()
- {
- return _type;
- }
-
- public void setType(AMQShortString type)
- {
- _type = type;
- }
-
- public AMQShortString getUserId()
- {
- return _userId;
- }
-
- public void setUserId(AMQShortString userId)
- {
- _userId = userId;
- }
-
- public AMQShortString getAppId()
- {
- return _appId;
- }
-
- public void setAppId(AMQShortString appId)
- {
- _appId = appId;
- }
-
- // MapMessage Interface
-
- public boolean getBoolean(AMQShortString string) throws JMSException
- {
- Boolean b = getJMSHeaders().getBoolean(string);
-
- if (b == null)
- {
- if (getJMSHeaders().containsKey(string))
- {
- Object str = getJMSHeaders().getObject(string);
-
- if (str == null || !(str instanceof AMQShortString))
- {
- throw new MessageFormatException("getBoolean can't use " + string + " item.");
- }
- else
- {
- return Boolean.valueOf(((AMQShortString)str).asString());
- }
- }
- else
- {
- b = Boolean.valueOf(null);
- }
- }
-
- return b;
- }
-
- public char getCharacter(AMQShortString string) throws JMSException
- {
- Character c = getJMSHeaders().getCharacter(string);
-
- if (c == null)
- {
- if (getJMSHeaders().isNullStringValue(string.asString()))
- {
- throw new NullPointerException("Cannot convert null char");
- }
- else
- {
- throw new MessageFormatException("getChar can't use " + string + " item.");
- }
- }
- else
- {
- return (char) c;
- }
- }
-
- public byte[] getBytes(AMQShortString string) throws JMSException
- {
- byte[] bs = getJMSHeaders().getBytes(string);
-
- if (bs == null)
- {
- throw new MessageFormatException("getBytes can't use " + string + " item.");
- }
- else
- {
- return bs;
- }
- }
-
- public byte getByte(AMQShortString string) throws JMSException
- {
- Byte b = getJMSHeaders().getByte(string);
- if (b == null)
- {
- if (getJMSHeaders().containsKey(string))
- {
- Object str = getJMSHeaders().getObject(string);
-
- if (str == null || !(str instanceof AMQShortString))
- {
- throw new MessageFormatException("getByte can't use " + string + " item.");
- }
- else
- {
- return Byte.valueOf(((AMQShortString)str).asString());
- }
- }
- else
- {
- b = Byte.valueOf(null);
- }
- }
-
- return b;
- }
-
- public short getShort(AMQShortString string) throws JMSException
- {
- Short s = getJMSHeaders().getShort(string);
-
- if (s == null)
- {
- s = Short.valueOf(getByte(string));
- }
-
- return s;
- }
-
- public int getInteger(AMQShortString string) throws JMSException
- {
- Integer i = getJMSHeaders().getInteger(string);
-
- if (i == null)
- {
- i = Integer.valueOf(getShort(string));
- }
-
- return i;
- }
-
- public long getLong(AMQShortString string) throws JMSException
- {
- Long l = getJMSHeaders().getLong(string);
-
- if (l == null)
- {
- l = Long.valueOf(getInteger(string));
- }
-
- return l;
- }
-
- public float getFloat(AMQShortString string) throws JMSException
- {
- Float f = getJMSHeaders().getFloat(string);
-
- if (f == null)
- {
- if (getJMSHeaders().containsKey(string))
- {
- Object str = getJMSHeaders().getObject(string);
-
- if (str == null || !(str instanceof AMQShortString))
- {
- throw new MessageFormatException("getFloat can't use " + string + " item.");
- }
- else
- {
- return Float.valueOf(((AMQShortString)str).asString());
- }
- }
- else
- {
- f = Float.valueOf(null);
- }
-
- }
-
- return f;
- }
-
- public double getDouble(AMQShortString string) throws JMSException
- {
- Double d = getJMSHeaders().getDouble(string);
-
- if (d == null)
- {
- d = Double.valueOf(getFloat(string));
- }
-
- return d;
- }
-
- public AMQShortString getString(AMQShortString string) throws JMSException
- {
- AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
-
- if (s == null)
- {
- if (getJMSHeaders().containsKey(string))
- {
- Object o = getJMSHeaders().getObject(string);
- if (o instanceof byte[])
- {
- throw new MessageFormatException("getObject couldn't find " + string + " item.");
- }
- else
- {
- if (o == null)
- {
- return null;
- }
- else
- {
- s = (AMQShortString) o;
- }
- }
- }
- }
-
- return s;
- }
-
- public Object getObject(AMQShortString string) throws JMSException
- {
- return getJMSHeaders().getObject(string);
- }
-
- public void setBoolean(AMQShortString string, boolean b) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setBoolean(string, b);
- }
-
- public void setChar(AMQShortString string, char c) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setChar(string, c);
- }
-
- public Object setBytes(AMQShortString string, byte[] bytes)
- {
- return getJMSHeaders().setBytes(string, bytes);
- }
-
- public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
- {
- return getJMSHeaders().setBytes(string, bytes, start, length);
- }
-
- public void setByte(AMQShortString string, byte b) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setByte(string, b);
- }
-
- public void setShort(AMQShortString string, short i) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setShort(string, i);
- }
-
- public void setInteger(AMQShortString string, int i) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setInteger(string, i);
- }
-
- public void setLong(AMQShortString string, long l) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setLong(string, l);
- }
-
- public void setFloat(AMQShortString string, float v) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setFloat(string, v);
- }
-
- public void setDouble(AMQShortString string, double v) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setDouble(string, v);
- }
-
- public void setString(AMQShortString string, AMQShortString string1) throws JMSException
- {
- checkPropertyName(string);
- getJMSHeaders().setString(string.asString(), string1.asString());
- }
-
- public void setObject(AMQShortString string, Object object) throws JMSException
- {
- checkPropertyName(string);
- try
- {
- getJMSHeaders().setObject(string, object);
- }
- catch (AMQPInvalidClassException aice)
- {
- throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
- }
- }
-
- public boolean itemExists(AMQShortString string) throws JMSException
- {
- return getJMSHeaders().containsKey(string);
- }
-
- public Enumeration getPropertyNames()
- {
- return getJMSHeaders().getPropertyNames();
- }
-
- public void clear()
- {
- getJMSHeaders().clear();
- }
-
- public boolean propertyExists(AMQShortString propertyName)
- {
- return getJMSHeaders().propertyExists(propertyName);
- }
-
- public Object put(Object key, Object value)
- {
- return getJMSHeaders().setObject(key.toString(), value);
- }
-
- public Object remove(AMQShortString propertyName)
- {
- return getJMSHeaders().remove(propertyName);
- }
-
- public boolean isEmpty()
- {
- return getJMSHeaders().isEmpty();
- }
-
- public void writeToBuffer(ByteBuffer data)
- {
- getJMSHeaders().writeToBuffer(data);
- }
-
- public Enumeration getMapNames()
- {
- return getPropertyNames();
- }
-
- protected static void checkPropertyName(CharSequence propertyName)
- {
- if (propertyName == null)
- {
- throw new IllegalArgumentException("Property name must not be null");
- }
- else if (propertyName.length() == 0)
- {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- checkIdentiferFormat(propertyName);
- }
-
- protected static void checkIdentiferFormat(CharSequence propertyName)
- {
-// JMS requirements 3.5.1 Property Names
-// Identifiers:
-// - An identifier is an unlimited-length character sequence that must begin
-// with a Java identifier start character; all following characters must be Java
-// identifier part characters. An identifier start character is any character for
-// which the method Character.isJavaIdentifierStart returns true. This includes
-// '_' and '$'. An identifier part character is any character for which the
-// method Character.isJavaIdentifierPart returns true.
-// - Identifiers cannot be the names NULL, TRUE, or FALSE.
-// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
-// ESCAPE.
-// – Identifiers are either header field references or property references. The
-// type of a property value in a message selector corresponds to the type
-// used to set the property. If a property that does not exist in a message is
-// referenced, its value is NULL. The semantics of evaluating NULL values
-// in a selector are described in Section 3.8.1.2, “Null Values.”
-// – The conversions that apply to the get methods for properties do not
-// apply when a property is used in a message selector expression. For
-// example, suppose you set a property as a string value, as in the
-// following:
-// myMessage.setStringProperty("NumberOfOrders", "2");
-// The following expression in a message selector would evaluate to false,
-// because a string cannot be used in an arithmetic expression:
-// "NumberOfOrders > 1"
-// – Identifiers are case sensitive.
-// – Message header field references are restricted to JMSDeliveryMode,
-// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
-// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
-// null and if so are treated as a NULL value.
-
- if (Boolean.getBoolean("strict-jms"))
- {
- // JMS start character
- if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
- }
-
- // JMS part character
- int length = propertyName.length();
- for (int c = 1; c < length; c++)
- {
- if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
- }
- }
-
-
-
-
- // JMS invalid names
- if ((propertyName.equals("NULL")
- || propertyName.equals("TRUE")
- || propertyName.equals("FALSE")
- || propertyName.equals("NOT")
- || propertyName.equals("AND")
- || propertyName.equals("OR")
- || propertyName.equals("BETWEEN")
- || propertyName.equals("LIKE")
- || propertyName.equals("IN")
- || propertyName.equals("IS")
- || propertyName.equals("ESCAPE")))
- {
- throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
- }
- }
-
- }
-
- public AMQShortString getTransactionId()
- {
- return _transactionId;
- }
-
- public void setTransactionId(AMQShortString id)
- {
- _transactionId = id;
- }
-
- public AMQShortString getDestination()
- {
- return _destination;
- }
-
- public void setDestination(AMQShortString destination)
- {
- this._destination = destination;
- }
-
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
- this._exchange = exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
-
- public void setRoutingKey(AMQShortString routingKey)
- {
- this._routingKey = routingKey;
- }
-}
-
-
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
deleted file mode 100644
index 53a2142718..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.message;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.model.ModelPhase;
-
-public class MessagePhase extends AbstractPhase {
-
- private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
- private static final Logger _logger = Logger.getLogger(ModelPhase.class);
-
- public void messageReceived(Object msg) throws AMQPException
- {
- try
- {
- _queue.put((AMQPApplicationMessage)msg);
- }
- catch (InterruptedException e)
- {
- _logger.error("Error adding message to queue", e);
- }
- super.messageReceived(msg);
- }
-
- public void messageSent(Object msg) throws AMQPException
- {
- super.messageSent(msg);
- }
-
- public AMQPApplicationMessage getNextMessage()
- {
- return _queue.poll();
- }
-
- public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException
- {
- return _queue.poll(timeout, tu);
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
deleted file mode 100644
index efd7264f96..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.qpid.nclient.message;
-
-import org.apache.qpid.AMQException;
-
-public interface MessageStore {
-
- public void removeMessage(String identifier);
-
- public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException;
-
- public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException;
-
- public AMQPApplicationMessage getMessage(String identifier) throws AMQException;
-
- public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException;
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
deleted file mode 100644
index eb5a9c1778..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.qpid.nclient.message;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.qpid.AMQException;
-
-public class TransientMessageStore implements MessageStore {
-
- private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
-
- public AMQPApplicationMessage getMessage(String identifier)
- throws AMQException
- {
- return messageMap.get(identifier);
- }
-
- public void removeMessage(String identifier)
- {
- messageMap.remove(identifier);
- }
-
- public void storeContentBodyChunk(String identifier, byte[] contentBody)
- throws AMQException
- {
-
- }
-
- public void storeMessageMetaData(String identifier,
- MessageHeaders messageHeaders) throws AMQException
- {
-
- }
-
- public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException
- {
-
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
deleted file mode 100644
index d845059ee7..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package org.apache.qpid.nclient.model;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
-import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-/**
- * This Phase handles Layer 3 functionality of the AMQP spec.
- * This class acts as the interface between the API and the pipeline
- */
-public class ModelPhase extends AbstractPhase {
-
- private static final Logger _logger = Logger.getLogger(ModelPhase.class);
-
- private Map <Class,List> _methodListners = new HashMap<Class,List>();
-
- /**
- * ------------------------------------------------
- * Phase - methods introduced by Phase
- * ------------------------------------------------
- */
- public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
- {
- super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
- }
-
- public void messageReceived(Object msg) throws AMQPException
- {
- notifyMethodListerners((AMQPMethodEvent)msg);
-
- // not doing super.methodReceived here, as this is the end of
- // the pipeline
- //super.messageReceived(msg);
- }
-
- /**
- * This method should only except and pass messages
- * of Type @see AMQPMethodEvent
- */
- public void messageSent(Object msg) throws AMQPException
- {
- super.messageSent(msg);
- }
-
- /**
- * ------------------------------------------------
- * Event Handling
- * ------------------------------------------------
- */
-
- public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
- {
- AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
- eventManager.notifyEvent(event);
- }
-
- /**
- * ------------------------------------------------
- * Configuration
- * ------------------------------------------------
- */
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
deleted file mode 100644
index fc5878f6ef..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security;
-
-import javax.security.auth.callback.CallbackHandler;
-
-import org.apache.qpid.nclient.transport.ConnectionURL;
-
-public interface AMQPCallbackHandler extends CallbackHandler
-{
- void initialise(ConnectionURL url);
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
deleted file mode 100644
index 428cd6753d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-public class CallbackHandlerRegistry
-{
- private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class);
-
- private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
-
- private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>();
-
- private String _mechanisms;
-
- public static CallbackHandlerRegistry getInstance()
- {
- return _instance;
- }
-
- public Class getCallbackHandlerClass(String mechanism)
- {
- return _mechanismToHandlerClassMap.get(mechanism);
- }
-
- public String getMechanisms()
- {
- return _mechanisms;
- }
-
- private CallbackHandlerRegistry()
- {
- // first we register any Sasl client factories
- DynamicSaslRegistrar.registerSaslProviders();
- parseProperties();
- }
-
- private void parseProperties()
- {
- String key = QpidConstants.AMQP_SECURITY + "." +
- QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
- QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
-
- int index = ClientConfiguration.get().getMaxIndex(key);
-
- for (int i=0; i<index+1;i++)
- {
- String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
- String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
- Class clazz = null;
- try
- {
- clazz = Class.forName(className);
- if (!AMQPCallbackHandler.class.isAssignableFrom(clazz))
- {
- _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class +
- ". Skipping");
- continue;
- }
- _mechanismToHandlerClassMap.put(mechanism, clazz);
- if (_mechanisms == null)
- {
- _mechanisms = mechanism;
- }
- else
- {
- // one time cost
- _mechanisms = _mechanisms + " " + mechanism;
- }
- }
- catch (ClassNotFoundException ex)
- {
- _logger.warn("Unable to load class " + className + ". Skipping that SASL provider");
- continue;
- }
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
deleted file mode 100644
index 958c6c4782..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security;
-
-import java.security.Security;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.security.sasl.SaslClientFactory;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.QpidConstants;
-
-public class DynamicSaslRegistrar
-{
- private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class);
-
- public static void registerSaslProviders()
- {
- Map<String, Class<? extends SaslClientFactory>> factories = parseProperties();
- if (factories.size() > 0)
- {
- Security.addProvider(new JCAProvider(factories));
- _logger.debug("Dynamic SASL provider added as a security provider");
- }
- }
-
- private static Map<String, Class<? extends SaslClientFactory>> parseProperties()
- {
- List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
- TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
- new TreeMap<String, Class<? extends SaslClientFactory>>();
- for (String mechanism: mechanisms)
- {
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
- try
- {
- Class<?> clazz = Class.forName(className);
- if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
- {
- _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
- continue;
- }
- factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
- }
- catch (Exception ex)
- {
- _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
- }
- }
- return factoriesToRegister;
- }
-
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
deleted file mode 100644
index 10ccb88821..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security;
-
-import javax.security.sasl.SaslClientFactory;
-import java.security.Provider;
-import java.security.Security;
-import java.util.Map;
-
-public class JCAProvider extends Provider
-{
- public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
- {
- super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
- "AMQ SASL providers that want to be registered");
- register(providerMap);
- Security.addProvider(this);
- }
-
- private void register(Map<String, Class<? extends SaslClientFactory>> providerMap)
- {
- for (Map.Entry<String, Class<? extends SaslClientFactory>> me :
- providerMap.entrySet())
- {
- put("SaslClientFactory." + me.getKey(), me.getValue().getName());
- }
- }
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
deleted file mode 100644
index 7297d07134..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security;
-
-import java.io.IOException;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-
-import org.apache.qpid.nclient.transport.ConnectionURL;
-
-public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler
-{
- private ConnectionURL _url;
-
- public void initialise(ConnectionURL url)
- {
- _url = url;
- }
-
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
- {
- for (int i = 0; i < callbacks.length; i++)
- {
- Callback cb = callbacks[i];
- if (cb instanceof NameCallback)
- {
- ((NameCallback)cb).setName(_url.getUsername());
- }
- else if (cb instanceof PasswordCallback)
- {
- ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray());
- }
- else
- {
- throw new UnsupportedCallbackException(cb);
- }
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
deleted file mode 100644
index 1097346c1d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security.amqplain;
-
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.Callback;
-
-/**
- * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
- *
- */
-public class AmqPlainSaslClient implements SaslClient
-{
- /**
- * The name of this mechanism
- */
- public static final String MECHANISM = "AMQPLAIN";
-
- private CallbackHandler _cbh;
-
- public AmqPlainSaslClient(CallbackHandler cbh)
- {
- _cbh = cbh;
- }
-
- public String getMechanismName()
- {
- return "AMQPLAIN";
- }
-
- public boolean hasInitialResponse()
- {
- return true;
- }
-
- public byte[] evaluateChallenge(byte[] challenge) throws SaslException
- {
- // we do not care about the prompt or the default name
- NameCallback nameCallback = new NameCallback("prompt", "defaultName");
- PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
- Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
- try
- {
- _cbh.handle(callbacks);
- }
- catch (Exception e)
- {
- throw new SaslException("Error handling SASL callbacks: " + e, e);
- }
- FieldTable table = FieldTableFactory.newFieldTable();
- table.setString("LOGIN", nameCallback.getName());
- table.setString("PASSWORD", new String(pwdCallback.getPassword()));
- return table.getDataAsBytes();
- }
-
- public boolean isComplete()
- {
- return true;
- }
-
- public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
- {
- throw new SaslException("Not supported");
- }
-
- public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
- {
- throw new SaslException("Not supported");
- }
-
- public Object getNegotiatedProperty(String propName)
- {
- return null;
- }
-
- public void dispose() throws SaslException
- {
- _cbh = null;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
deleted file mode 100644
index f98c1e3a58..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.security.amqplain;
-
-import javax.security.sasl.SaslClientFactory;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.Sasl;
-import javax.security.auth.callback.CallbackHandler;
-import java.util.Map;
-
-public class AmqPlainSaslClientFactory implements SaslClientFactory
-{
- public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
- {
- for (int i = 0; i < mechanisms.length; i++)
- {
- if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
- {
- if (cbh == null)
- {
- throw new SaslException("CallbackHandler must not be null");
- }
- return new AmqPlainSaslClient(cbh);
- }
- }
- return null;
- }
-
- public String[] getMechanismNames(Map props)
- {
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
- {
- // returned array must be non null according to interface documentation
- return new String[0];
- }
- else
- {
- return new String[]{AmqPlainSaslClient.MECHANISM};
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
deleted file mode 100644
index 9e878fb839..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
-import java.util.HashMap;
-import java.net.URISyntaxException;
-import java.net.URI;
-
-public class AMQPBrokerDetails implements BrokerDetails
-{
- private String _host;
-
- private int _port;
-
- private String _transport;
-
- private HashMap<String, String> _options;
-
- public AMQPBrokerDetails()
- {
- _options = new HashMap<String, String>();
- }
-
- public AMQPBrokerDetails(String url) throws URLSyntaxException
- {
- this();
- // URL should be of format tcp://host:port?option='value',option='value'
- try
- {
- URI connection = new URI(url);
-
- String transport = connection.getScheme();
-
- // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
- if (transport != null)
- {
- //todo this list of valid transports should be enumerated somewhere
- if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp"))))
- {
- if (transport.equalsIgnoreCase("localhost"))
- {
- connection = new URI(DEFAULT_TRANSPORT + "://" + url);
- transport = connection.getScheme();
- }
- else
- {
- if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
- {
- //Then most likely we have a host:port value
- connection = new URI(DEFAULT_TRANSPORT + "://" + url);
- transport = connection.getScheme();
- }
- else
- {
- URLHelper.parseError(0, transport.length(), "Unknown transport", url);
- }
- }
- }
- }
- else
- {
- //Default the transport
- connection = new URI(DEFAULT_TRANSPORT + "://" + url);
- transport = connection.getScheme();
- }
-
- if (transport == null)
- {
- URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url
- + "' Format: " + URL_FORMAT_EXAMPLE, "");
- }
-
- setTransport(transport);
-
- String host = connection.getHost();
-
- // Fix for Java 1.5
- if (host == null)
- {
- host = "";
- }
-
- setHost(host);
-
- int port = connection.getPort();
-
- if (port == -1)
- {
- // Fix for when there is port data but it is not automatically parseable by getPort().
- String auth = connection.getAuthority();
-
- if (auth != null && auth.contains(":"))
- {
- int start = auth.indexOf(":") + 1;
- int end = start;
- boolean looking = true;
- boolean found = false;
- //Walk the authority looking for a port value.
- while (looking)
- {
- try
- {
- end++;
- Integer.parseInt(auth.substring(start, end));
-
- if (end >= auth.length())
- {
- looking = false;
- found = true;
- }
- }
- catch (NumberFormatException nfe)
- {
- looking = false;
- }
-
- }
- if (found)
- {
- setPort(Integer.parseInt(auth.substring(start, end)));
- }
- else
- {
- URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
- "Illegal character in port number", connection.toString());
- }
-
- }
- else
- {
- setPort(DEFAULT_PORT);
- }
- }
- else
- {
- setPort(port);
- }
-
- String queryString = connection.getQuery();
-
- URLHelper.parseOptions(_options, queryString);
-
- //Fragment is #string (not used)
- }
- catch (URISyntaxException uris)
- {
- if (uris instanceof URLSyntaxException)
- {
- throw (URLSyntaxException) uris;
- }
-
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
- }
- }
-
- public AMQPBrokerDetails(String host, int port, boolean useSSL)
- {
- _host = host;
- _port = port;
-
- if (useSSL)
- {
- setOption(OPTIONS_SSL, "true");
- }
- }
-
- public String getHost()
- {
- return _host;
- }
-
- public void setHost(String _host)
- {
- this._host = _host;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public void setPort(int _port)
- {
- this._port = _port;
- }
-
- public String getTransport()
- {
- return _transport;
- }
-
- public void setTransport(String _transport)
- {
- this._transport = _transport;
- }
-
- public String getOption(String key)
- {
- return _options.get(key);
- }
-
- public void setOption(String key, String value)
- {
- _options.put(key, value);
- }
-
- public long getTimeout()
- {
- if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
- {
- try
- {
- return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
- }
- catch (NumberFormatException nfe)
- {
- //Do nothing as we will use the default below.
- }
- }
-
- return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
- }
-
- public void setTimeout(long timeout)
- {
- setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
- }
-
- public String toString()
- {
- StringBuffer sb = new StringBuffer();
-
- sb.append(_transport);
- sb.append("://");
-
- if (!(_transport.equalsIgnoreCase("vm")))
- {
- sb.append(_host);
- }
-
- sb.append(':');
- sb.append(_port);
-
- sb.append(printOptionsURL());
-
- return sb.toString();
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof BrokerDetails))
- {
- return false;
- }
-
- BrokerDetails bd = (BrokerDetails) o;
-
- return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort())
- && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL());
-
- //todo do we need to compare all the options as well?
- }
-
- private String printOptionsURL()
- {
- StringBuffer optionsURL = new StringBuffer();
-
- optionsURL.append('?');
-
- if (!(_options.isEmpty()))
- {
-
- for (String key : _options.keySet())
- {
- optionsURL.append(key);
-
- optionsURL.append("='");
-
- optionsURL.append(_options.get(key));
-
- optionsURL.append("'");
-
- optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- }
- }
-
- //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
- optionsURL.deleteCharAt(optionsURL.length() - 1);
-
- return optionsURL.toString();
- }
-
- public boolean useSSL()
- {
- // To be friendly to users we should be case insensitive.
- // or simply force users to conform to OPTIONS_SSL
- // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
-
- if (_options.containsKey(OPTIONS_SSL))
- {
- return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
- }
-
- return USE_SSL_DEFAULT;
- }
-
- public void useSSL(boolean ssl)
- {
- setOption(OPTIONS_SSL, Boolean.toString(ssl));
- }
-
- public static String checkTransport(String broker)
- {
- if ((!broker.contains("://")))
- {
- return "tcp://" + broker;
- }
- else
- {
- return broker;
- }
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
deleted file mode 100644
index d0259830c6..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
-import java.util.*;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-
-public class AMQPConnectionURL implements ConnectionURL
-{
- private String _url;
- private String _failoverMethod;
- private HashMap<String, String> _failoverOptions;
- private HashMap<String, String> _options;
- private List<BrokerDetails> _brokers;
- private String _clientName;
- private String _username;
- private String _password;
- private String _virtualHost;
-
- public AMQPConnectionURL(String fullURL) throws URLSyntaxException
- {
- _url = fullURL;
- _options = new HashMap<String, String>();
- _brokers = new LinkedList<BrokerDetails>();
- _failoverOptions = new HashMap<String, String>();
-
- try
- {
- URI connection = new URI(fullURL);
-
- if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL)))
- {
- throw new URISyntaxException(fullURL, "Not an AMQP URL");
- }
-
- if (connection.getHost() == null || connection.getHost().equals(""))
- {
- String uid = getUniqueClientID();
- if (uid == null)
- {
- URLHelper.parseError(-1, "Client Name not specified", fullURL);
- }
- else
- {
- setClientName(uid);
- }
-
- }
- else
- {
- setClientName(connection.getHost());
- }
-
- String userInfo = connection.getUserInfo();
-
- if (userInfo == null)
- {
- //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs
- userInfo = connection.getAuthority();
-
- if (userInfo != null)
- {
- int atIndex = userInfo.indexOf('@');
-
- if (atIndex != -1)
- {
- userInfo = userInfo.substring(0, atIndex);
- }
- else
- {
- userInfo = null;
- }
- }
-
- }
-
- if (userInfo == null)
- {
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3,
- "User information not found on url", fullURL);
- }
- else
- {
- parseUserInfo(userInfo);
- }
- String virtualHost = connection.getPath();
-
- if (virtualHost != null && (!virtualHost.equals("")))
- {
- setVirtualHost(virtualHost);
- }
- else
- {
- int authLength = connection.getAuthority().length();
- int start = AMQ_PROTOCOL.length() + 3;
- int testIndex = start + authLength;
- if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?')
- {
- URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL);
- }
- else
- {
- URLHelper.parseError(-1, "Virtual host not specified", fullURL);
- }
-
- }
-
-
- URLHelper.parseOptions(_options, connection.getQuery());
-
- processOptions();
-
- //Fragment is #string (not used)
- //System.out.println(connection.getFragment());
-
- }
- catch (URISyntaxException uris)
- {
- if (uris instanceof URLSyntaxException)
- {
- throw (URLSyntaxException) uris;
- }
-
- int slash = fullURL.indexOf("\\");
-
- if (slash == -1)
- {
- URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
- }
- else
- {
- if (slash != 0 && fullURL.charAt(slash - 1) == ':')
- {
- URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL);
- }
- else
- {
- URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL);
- }
- }
-
- }
- }
-
- private String getUniqueClientID()
- {
- try
- {
- InetAddress addr = InetAddress.getLocalHost();
- return addr.getHostName() + System.currentTimeMillis();
- }
- catch (UnknownHostException e)
- {
- return null;
- }
- }
-
- private void parseUserInfo(String userinfo) throws URLSyntaxException
- {
- //user info = user:pass
-
- int colonIndex = userinfo.indexOf(':');
-
- if (colonIndex == -1)
- {
- URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(),
- "Null password in user information not allowed.", _url);
- }
- else
- {
- setUsername(userinfo.substring(0, colonIndex));
- setPassword(userinfo.substring(colonIndex + 1));
- }
-
- }
-
- private void processOptions() throws URLSyntaxException
- {
- if (_options.containsKey(OPTIONS_BROKERLIST))
- {
- String brokerlist = _options.get(OPTIONS_BROKERLIST);
-
- //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
- StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
-
- while (st.hasMoreTokens())
- {
- String broker = st.nextToken();
-
- _brokers.add(new AMQPBrokerDetails(broker));
- }
-
- _options.remove(OPTIONS_BROKERLIST);
- }
-
- if (_options.containsKey(OPTIONS_FAILOVER))
- {
- String failover = _options.get(OPTIONS_FAILOVER);
-
- // failover='method?option='value',option='value''
-
- int methodIndex = failover.indexOf('?');
-
- if (methodIndex > -1)
- {
- _failoverMethod = failover.substring(0, methodIndex);
- URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1));
- }
- else
- {
- _failoverMethod = failover;
- }
-
- _options.remove(OPTIONS_FAILOVER);
- }
- }
-
- public String getURL()
- {
- return _url;
- }
-
- public String getFailoverMethod()
- {
- return _failoverMethod;
- }
-
- public String getFailoverOption(String key)
- {
- return _failoverOptions.get(key);
- }
-
- public void setFailoverOption(String key, String value)
- {
- _failoverOptions.put(key, value);
- }
-
- public int getBrokerCount()
- {
- return _brokers.size();
- }
-
- public BrokerDetails getBrokerDetails(int index)
- {
- if (index < _brokers.size())
- {
- return _brokers.get(index);
- }
- else
- {
- return null;
- }
- }
-
- public void addBrokerDetails(BrokerDetails broker)
- {
- if (!(_brokers.contains(broker)))
- {
- _brokers.add(broker);
- }
- }
-
- public List<BrokerDetails> getAllBrokerDetails()
- {
- return _brokers;
- }
-
- public String getClientName()
- {
- return _clientName;
- }
-
- public void setClientName(String clientName)
- {
- _clientName = clientName;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public void setUsername(String username)
- {
- _username = username;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(String virtuaHost)
- {
- _virtualHost = virtuaHost;
- }
-
- public String getOption(String key)
- {
- return _options.get(key);
- }
-
- public void setOption(String key, String value)
- {
- _options.put(key, value);
- }
-
- public String toString()
- {
- StringBuffer sb = new StringBuffer();
-
- sb.append(AMQ_PROTOCOL);
- sb.append("://");
-
- if (_username != null)
- {
- sb.append(_username);
-
- if (_password != null)
- {
- sb.append(':');
- sb.append(_password);
- }
-
- sb.append('@');
- }
-
- sb.append(_clientName);
-
- sb.append(_virtualHost);
-
- sb.append(optionsToString());
-
- return sb.toString();
- }
-
- private String optionsToString()
- {
- StringBuffer sb = new StringBuffer();
-
- sb.append("?" + OPTIONS_BROKERLIST + "='");
-
- for (BrokerDetails service : _brokers)
- {
- sb.append(service.toString());
- sb.append(';');
- }
-
- sb.deleteCharAt(sb.length() - 1);
- sb.append("'");
-
- if (_failoverMethod != null)
- {
- sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
- sb.append(OPTIONS_FAILOVER + "='");
- sb.append(_failoverMethod);
- sb.append(URLHelper.printOptions(_failoverOptions));
- sb.append("'");
- }
-
- return sb.toString();
- }
-
-
- public static void main(String[] args) throws URLSyntaxException
- {
-
- String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'";
- //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''";
-
- //ConnectionURL connectionurl2 = new AMQConnectionURL(url2);
-
- System.out.println(url2);
- //System.out.println(connectionurl2);
-
- }
-
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
deleted file mode 100644
index fe20a1e8dd..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-public interface BrokerDetails
-{
-
- /*
- * Known URL Options
- * @see ConnectionURL
- */
- public static final String OPTIONS_RETRY = "retries";
- public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL;
- public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
- public static final int DEFAULT_PORT = 5672;
-
- public static final String TCP = "tcp";
- public static final String VM = "vm";
-
- public static final String DEFAULT_TRANSPORT = TCP;
-
- public static final String URL_FORMAT_EXAMPLE =
- "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]";
-
- public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
- public static final boolean USE_SSL_DEFAULT = false;
-
- String getHost();
-
- void setHost(String host);
-
- int getPort();
-
- void setPort(int port);
-
- String getTransport();
-
- void setTransport(String transport);
-
- boolean useSSL();
-
- void useSSL(boolean ssl);
-
- String getOption(String key);
-
- void setOption(String key, String value);
-
- long getTimeout();
-
- void setTimeout(long timeout);
-
- String toString();
-
- boolean equals(Object o);
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
deleted file mode 100644
index 79653dc442..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-import java.util.List;
-
-/**
- Connection URL format
- For TCP:
- amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\'
-
- For VMBroker:
- vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
-
- Options are of course optional except for requiring a single broker in the broker list.
- The option seperator is defined to be either '&' or ','
- */
-public interface ConnectionURL
-{
- public static final String AMQ_PROTOCOL = "amqp";
- public static final String OPTIONS_BROKERLIST = "brokerlist";
- public static final String OPTIONS_FAILOVER = "failover";
- public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount";
- public static final String OPTIONS_SSL = "ssl";
-
- String getURL();
-
- String getFailoverMethod();
-
- String getFailoverOption(String key);
-
- int getBrokerCount();
-
- BrokerDetails getBrokerDetails(int index);
-
- void addBrokerDetails(BrokerDetails broker);
-
- List<BrokerDetails> getAllBrokerDetails();
-
- String getClientName();
-
- void setClientName(String clientName);
-
- String getUsername();
-
- void setUsername(String username);
-
- String getPassword();
-
- void setPassword(String password);
-
- String getVirtualHost();
-
- void setVirtualHost(String virtualHost);
-
- String getOption(String key);
-
- void setOption(String key, String value);
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
deleted file mode 100644
index 734aa68a9d..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.qpid.nclient.transport;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.DefaultPhaseContext;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.PhaseFactory;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
-public class TCPConnection implements TransportConnection
-{
- private static final Logger _logger = Logger.getLogger(TCPConnection.class);
- private BrokerDetails _brokerDetails;
- private IoConnector _ioConnector;
- private Phase _phase;
- private PhaseContext _ctx;
-
- protected TCPConnection(ConnectionURL url, PhaseContext ctx)
- {
- _brokerDetails = url.getBrokerDetails(0);
- _ctx = ctx;
-
- ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR))
- {
- // Not sure what the original code wanted use :)
- }
- else
- {
- _logger.info("Using SimpleByteBufferAllocator");
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- }
-
- _ioConnector = new SocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL))
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY));
- scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024);
- scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024);
- }
-
- // Returns the phase pipe
- public Phase connect() throws AMQPException
- {
- _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
-
- _phase = PhaseFactory.createPhasePipe(_ctx);
- _phase.start();
-
- return _phase;
- }
-
- public void close() throws AMQPException
- {
-
- }
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
deleted file mode 100644
index 9df353a2df..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.Phase;
-
-public interface TransportConnection
-{
- public Phase connect()throws AMQPException;
-
- public void close()throws AMQPException;
-
- public Phase getPhasePipe();
-} \ No newline at end of file
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
deleted file mode 100644
index ce2145c08b..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.qpid.nclient.transport;
-
-import java.net.URISyntaxException;
-
-import org.apache.qpid.nclient.core.PhaseContext;
-
-public class TransportConnectionFactory
-{
- public enum ConnectionType
- {
- TCP,VM
- }
-
- public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException
- {
- return createTransportConnection(new AMQPConnectionURL(url),type,ctx);
-
- }
-
- public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx)
- {
- switch (type)
- {
- case TCP : default:
- {
- return new TCPConnection(url,ctx);
- }
-
- case VM :
- {
- return new VMConnection(url,ctx);
- }
- }
-
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
deleted file mode 100644
index 911e855d4f..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.nclient.transport;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.AbstractPhase;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.ssl.BogusSSLContextFactory;
-
-/**
- * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame
- * layer
- *
- */
-public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList
-{
-
- private static final Logger _logger = Logger
- .getLogger(TransportPhase.class);
-
- private IoSession _ioSession;
- private BrokerDetails _brokerDetails;
-
- protected WriteFuture _lastWriteFuture;
-
- /**
- * ------------------------------------------------
- * Phase - methods introduced by Phase
- * ------------------------------------------------
- */
-
- public void start()throws AMQPException
- {
- _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS);
- IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR);
-
- final SocketAddress address;
- if (ioConnector instanceof VmPipeConnector)
- {
- address = new VmPipeAddress(_brokerDetails.getPort());
- }
- else
- {
- address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort());
- _logger.info("Attempting connection to " + address);
-
- }
-
- ConnectFuture future = ioConnector.connect(address,this);
-
- // wait for connection to complete
- if (future.join(_brokerDetails.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
- {
- throw new AMQPException("Timeout waiting for connection.");
- }
- }
-
- public void messageReceived(Object frame) throws AMQPException
- {
- super.messageReceived(frame);
- }
-
- public void messageSent(Object frame) throws AMQPException
- {
- _ioSession.write(frame);
- }
-
- /**
- * ------------------------------------------------
- * IoHandler - methods introduced by IoHandler
- * ------------------------------------------------
- */
- public void sessionIdle(IoSession session, IdleStatus status)
- throws Exception
- {
- _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: "
- + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- // HeartbeatDiagnostics.sent();
- } else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- // HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
- }
-
- public void messageReceived(IoSession session, Object message)
- throws Exception
- {
- AMQFrame frame = (AMQFrame) message;
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- if (bodyFrame instanceof HeartbeatBody)
- {
- _logger.debug("Received heartbeat");
- } else
- {
- messageReceived(frame);
- }
- }
-
- public void messageSent(IoSession session, Object message) throws Exception
- {
- _logger.debug("Sent frame " + message);
- }
-
- public void exceptionCaught(IoSession session, Throwable cause)
- throws Exception
- {
- // Need to handle failover
- _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause);
- //sessionClosed(session);
- }
-
- public void sessionClosed(IoSession session) throws Exception
- {
- // Need to handle failover
- _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed");
- }
-
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.info("Protocol session created for " + this + " session : "
- + System.identityHashCode(session));
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(
- new AMQCodecFactory(false));
-
- if (ClientConfiguration.get().getBoolean(
- QpidConstants.USE_SHARED_READ_WRITE_POOL))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter",
- "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_brokerDetails.useSSL())
- {
- // FIXME: Bogus context cannot be used in production.
- SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
- .getInstance(false));
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl",
- sslFilter);
- }
-
- try
- {
-
- ReadWriteThreadModel threadModel = ReadWriteThreadModel
- .getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(
- session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(
- session);
- } catch (RuntimeException e)
- {
- e.printStackTrace();
- }
-
- _ioSession = session;
- doAMQPConnectionNegotiation();
- }
-
- public void sessionOpened(IoSession session) throws Exception
- {
- _logger.info("Protocol session opened for " + this + " : session "
- + System.identityHashCode(session));
- }
-
- /**
- * ----------------------------------------------------------
- * Protocol related methods
- * ----------------------------------------------------------
- */
- private void doAMQPConnectionNegotiation()
- {
- int i = pv.length - 1;
- _logger.debug("Engaging in connection negotiation");
- writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
- }
-
- /**
- * ----------------------------------------------------------
- * Write Operations
- * ----------------------------------------------------------
- */
- public void writeFrame(AMQDataBlock frame)
- {
- writeFrame(frame, false);
- }
-
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- WriteFuture f = _ioSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- } else
- {
- _lastWriteFuture = f;
- }
- }
-
- /**
- * -----------------------------------------------------------
- * Failover section
- * -----------------------------------------------------------
- */
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
deleted file mode 100644
index ba38848149..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package org.apache.qpid.nclient.transport;
-
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
-import org.apache.mina.transport.vmpipe.VmPipeConnector;
-import org.apache.qpid.nclient.config.ClientConfiguration;
-import org.apache.qpid.nclient.core.AMQPException;
-import org.apache.qpid.nclient.core.DefaultPhaseContext;
-import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.core.PhaseContext;
-import org.apache.qpid.nclient.core.PhaseFactory;
-import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
-
-public class VMConnection implements TransportConnection
-{
- private static final Logger _logger = Logger.getLogger(VMConnection.class);
- private BrokerDetails _brokerDetails;
- private IoConnector _ioConnector;
- private Phase _phase;
- private PhaseContext _ctx;
-
- protected VMConnection(ConnectionURL url,PhaseContext ctx)
- {
- _brokerDetails = url.getBrokerDetails(0);
- _ctx = ctx;
-
- _ioConnector = new VmPipeConnector();
- final IoServiceConfig cfg = _ioConnector.getDefaultConfig();
- ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService,
- "AsynchronousReadFilter");
- cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
- PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService,
- "AsynchronousWriteFilter");
- cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
- }
-
- public Phase connect() throws AMQPException
- {
- createVMBroker();
-
- _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
-
- _phase = PhaseFactory.createPhasePipe(_ctx);
- _phase.start();
-
- return _phase;
-
- }
-
- private void createVMBroker()throws AMQPException
- {
- _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
-
- VmPipeAcceptor acceptor = new VmPipeAcceptor();
- IoServiceConfig config = acceptor.getDefaultConfig();
- config.setThreadModel(ReadWriteThreadModel.getInstance());
-
- IoHandlerAdapter provider = null;
- try
- {
- VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
- provider = createBrokerInstance(_brokerDetails.getPort());
- acceptor.bind(pipe, provider);
- _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort());
- }
- catch (IOException e)
- {
- _logger.error(e);
- VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort());
- acceptor.unbind(pipe);
-
- throw new AMQPException("Error creating VM broker",e);
- }
- }
-
- private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException
- {
- String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS);
- _logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
-
- // can't use introspection to get Provider as it is a server class.
- // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access.
-
- //get correct constructor and pass in instancec ID - "port"
- IoHandlerAdapter provider;
- try
- {
- Class[] cnstr = {Integer.class};
- Object[] params = {port};
- provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
- //Give the broker a second to create
- _logger.info("Created VMBroker Instance:" + port);
- }
- catch (Exception e)
- {
- _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause());
- _logger.error(e);
- String because;
- if (e.getCause() == null)
- {
- because = e.toString();
- }
- else
- {
- because = e.getCause().toString();
- }
-
-
- throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e);
- }
-
- return provider;
- }
-
- public void close() throws AMQPException
- {
-
- }
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
-}
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
deleted file mode 100644
index e161e30227..0000000000
--- a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.qpid.nclient.util;
-
-import org.apache.qpid.nclient.core.AMQPException;
-
-public class AMQPValidator
-{
- public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException
- {
- if(obj == null)
- {
- throw new AMQPException(msg);
- }
- }
-}