diff options
Diffstat (limited to 'java')
76 files changed, 2 insertions, 8180 deletions
diff --git a/java/common/protocol-version.xml b/java/common/protocol-version.xml index 8afc8a7de7..49cb2102b3 100644 --- a/java/common/protocol-version.xml +++ b/java/common/protocol-version.xml @@ -27,8 +27,8 @@ <property name="generated.dir" location="${generated.path}/${generated.package}" /> <property name="generated.timestamp" location="${generated.dir}/timestamp" /> <property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" /> - <property name="xml.spec.deps" value="amqp.0-8.xml cluster.0-8.xml" /> - <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/cluster.0-8.xml" /> + <property name="xml.spec.deps" value="amqp.0-8.xml cluster.0-8.xml message.0-9.xml" /> + <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/cluster.0-8.xml ${xml.spec.dir}/message.0-9.xml" /> <target name="generate" depends="compile_generator,check_generate_deps" unless="generation.notRequired"> <mkdir dir="${generated.dir}"/> diff --git a/java/newclient/.project b/java/newclient/.project deleted file mode 100644 index 4d42311982..0000000000 --- a/java/newclient/.project +++ /dev/null @@ -1,17 +0,0 @@ -<projectDescription> - <name>qpid-newclient</name> - <comment/> - <projects> - <project>qpid-broker</project> - <project>qpid-common</project> - </projects> - <buildSpec> - <buildCommand> - <name>org.eclipse.jdt.core.javabuilder</name> - <arguments/> - </buildCommand> - </buildSpec> - <natures> - <nature>org.eclipse.jdt.core.javanature</nature> - </natures> -</projectDescription>
\ No newline at end of file diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml deleted file mode 100644 index 5a82feb1d2..0000000000 --- a/java/newclient/pom.xml +++ /dev/null @@ -1,194 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-newclient</artifactId> - <packaging>jar</packaging> - <version>1.0-incubating-M2-SNAPSHOT</version> - <name>Qpid New Client</name> - <url>http://cwiki.apache.org/confluence/display/qpid</url> - - <parent> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid</artifactId> - <version>1.0-incubating-M2-SNAPSHOT</version> - </parent> - - <properties> - <topDirectoryLocation>..</topDirectoryLocation> - <java.source.version>1.5</java.source.version> - <qpid.version>${pom.version}</qpid.version> - <qpid.targetDir>${project.build.directory}</qpid.targetDir> - </properties> - - <dependencies> - - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-common</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - </dependency> - - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - </dependency> - - <dependency> - <groupId>commons-configuration</groupId> - <artifactId>commons-configuration</artifactId> - <version>1.3</version> - </dependency> - - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.mina</groupId> - <artifactId>mina-filter-ssl</artifactId> - </dependency> - - <!-- Test Dependencies --> - <dependency> <!-- for inVm Broker --> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-broker</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>jmscts</groupId> - <artifactId>jmscts</artifactId> - <version>0.5-b2</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.easymock</groupId> - <artifactId>easymockclassextension</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemProperties> - <property> - <name>amqj.noAutoCreateVMBroker</name> - <value>true</value> - </property> - <property> - <name>amqj.logging.level</name> - <value>${amqj.logging.level}</value> - </property> - <property> - <name>log4j.configuration</name> - <value>file:///${basedir}/src/main/java/client.log4j</value> - </property> - </systemProperties> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - </plugins> - -<!-- The inclusion of this resource causes the build to hang. --> - <!--resources> - <resource> - <targetPath>META-INF/</targetPath> - <filtering>false</filtering> - <directory>../resources/META-INF</directory> - <includes> - <include>**</include> - </includes> - </resource> - </resources--> - - <testResources> - <testResource> - <targetPath>META-INF/</targetPath> - <filtering>false</filtering> - <directory>../resources/META-INF</directory> - <includes> - <include>**</include> - </includes> - </testResource> - <testResource> - <targetPath>src/</targetPath> - <filtering>false</filtering> - <directory>src/test/java</directory> - <includes> - <include>**/*.java</include> - </includes> - </testResource> - - <testResource> - <targetPath></targetPath> - <filtering>false</filtering> - <directory>src/main/java</directory> - <includes> - <include>client.log4j</include> - </includes> - </testResource> - </testResources> - - </build> - -</project> diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j deleted file mode 100644 index c2c7326479..0000000000 --- a/java/newclient/src/main/java/client.log4j +++ /dev/null @@ -1,29 +0,0 @@ -#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=DEBUG
-
-
-#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
-log4j.logger.org.apache.qpid=DEBUG, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java deleted file mode 100644 index a733fb7db7..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQMethodBody; - -public abstract class AMQPCallBack -{ - private boolean _isComplete = false; - - public abstract void brokerResponded(AMQMethodBody body); - - public abstract void brokerRespondedWithError(AMQException e); - - public void setIsComplete(boolean isComplete) - { - _isComplete = isComplete; - } - - public boolean isComplete() - { - return _isComplete; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java deleted file mode 100644 index f984e812c2..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import java.security.SecureRandom; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.QpidConstants; - -public abstract class AMQPCallBackSupport -{ - private SecureRandom _localCorrelationIdGenerator = new SecureRandom(); - protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>(); - - //the channelId assigned for this instance - protected int _channelId; - - public AMQPCallBackSupport(int channelId) - { - _channelId = channelId; - } - - private long getNextCorrelationId() - { - return _localCorrelationIdGenerator.nextLong(); - } - - - // For methods that still use nowait, hopefully they will remove nowait - protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb) - { - if(noWait) - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); - return msg; - } - else - { - // u only need to register if u are expecting a response - long localCorrelationId = getNextCorrelationId(); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); - _cbMap.put(localCorrelationId, cb); - return msg; - } - } - - protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb) - { - long localCorrelationId = getNextCorrelationId(); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); - _cbMap.put(localCorrelationId, cb); - return msg; - } - - protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException - { - if(_cbMap.containsKey(localCorrelationId)) - { - AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId); - if(cb == null) - { - throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody); - } - else - { - cb.setIsComplete(true); - cb.brokerResponded(methodBody); - } - _cbMap.remove(localCorrelationId); - } - else - { - //ignore, as this event is for another class instance - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java deleted file mode 100644 index cbb60b130d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ChannelResumeBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPChannel -{ - - /** - * Opens the channel - */ - public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException; - - /** - * Close the channel - */ - public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException; - - /** - * Channel Flow - */ - public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException; - - /** - * Close the channel - */ - public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException; - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java deleted file mode 100644 index 4976daa4fa..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.nclient.amqp.event.AMQPEventManager; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage; -import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; -import org.apache.qpid.url.URLSyntaxException; - -public interface AMQPClassFactory -{ - - public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException; - - public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException; - - public abstract AMQPChannel createChannelClass(int channel) throws AMQPException; - - public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException; - - public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException; - - public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException; - - public abstract AMQPQueue createQueueClass(int channel) throws AMQPException; - - public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException; - - public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException; - - public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException; - - /** - * Extention point - * Other interested parties can obtain a reference to the event manager - * and add listeners to get notified of events - * - */ - public abstract AMQPEventManager getEventManager(); - - /** - * Extention point - * Other interested parties can obtain a reference to the state manager - * and add listeners to get notified of state changes - * - */ - public abstract AMQPStateManager getStateManager(); - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java deleted file mode 100644 index b18fed5605..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPConnection -{ - - /** - * Opens the TCP connection and let the formalities begin. - */ - public abstract ConnectionStartBody openTCPConnection() throws AMQPException; - - /** - * The current java broker implementation can send a connection tune body - * as a response to the startOk. Not sure if that is the correct behaviour. - */ - public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException; - - /** - * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could - * issue a new challenge - */ - public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException; - - public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException; - - public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException; - - public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException; - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java deleted file mode 100644 index 53e11c48fb..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.DtxCoordinationCommitBody; -import org.apache.qpid.framing.DtxCoordinationForgetBody; -import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; -import org.apache.qpid.framing.DtxCoordinationPrepareBody; -import org.apache.qpid.framing.DtxCoordinationRecoverBody; -import org.apache.qpid.framing.DtxCoordinationRollbackBody; -import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPDtxCoordination -{ - public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException; - - public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException; - - public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException; - - public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException; - - public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException; - - public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException; - - public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java deleted file mode 100644 index 41f1414205..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.DtxDemarcationEndBody; -import org.apache.qpid.framing.DtxDemarcationEndOkBody; -import org.apache.qpid.framing.DtxDemarcationSelectBody; -import org.apache.qpid.framing.DtxDemarcationSelectOkBody; -import org.apache.qpid.framing.DtxDemarcationStartBody; -import org.apache.qpid.framing.DtxDemarcationStartOkBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPDtxDemarcation -{ - public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException; - - public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException; - - public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java deleted file mode 100644 index c2f93b8f42..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPExchange -{ - - /** - * ----------------------------------------------- - * API Methods - * ----------------------------------------------- - */ - public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException; - - public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException; - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java deleted file mode 100644 index 61c5825fe0..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageConsumeBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageOffsetBody; -import org.apache.qpid.framing.MessageOkBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageQosBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageRejectBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPMessage -{ - - /** - * ----------------------------------------------- - * API Methods - * ----------------------------------------------- - */ - - public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException; - - public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException; - - public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException; - - public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException; - - public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException; - - public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException; - - public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException; - - public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException; - - public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException; - - public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException; - - public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException; - - /** - * The correlationId from the request. - * For example if a message.transfer is sent with correlationId "ABCD" - * then u need to pass that in. This correlation id is used by the execution layer - * to handle the correlation of method requests and responses - */ - public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException; - - /** - * The correlationId from the request. - * For example if a message.transfer is sent with correlationId "ABCD" - * then u need to pass that in. This correlation id is used by the execution layer - * to handle the correlation of method requests and responses - */ - public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException; - - /** - * The correlationId from the request. - * For example if a message.resume is sent with correlationId "ABCD" - * then u need to pass that in. This correlation id is used by the execution layer - * to handle the correlation of method requests and responses - */ - public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException; - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java deleted file mode 100644 index c10d6975c6..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.nclient.core.AMQPException; - -/** - * This class also represents the AMQP Message class. - * You need an instance per channel. - * This is passed in as an argument in the constructor of an AMQPMessage instance. - * A client who implements this interface is notified When the broker issues - * Message class methods on the client. - * - * A Client should use the AMQPMessage class when it wants to issue Message class - * methods on the broker. - * - * A JMS MessageConsumer implementation can implement this interface and map - * AMQP Method notifications to the appropriate JMS methods. - * - * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance. - * - */ - -public interface AMQPMessageCallBack -{ - /** - * ----------------------------------------------------------------------- - * This provides Notifications for broker initiated Message class methods. - * All methods have a correlationId that u need to pass into - * the corresponding Message methods when responding to the broker. - * - * For example the correlationID passed in from Message.trasnfer - * should be passed back when u call Message.ok in AMQPMessage - * ----------------------------------------------------------------------- - */ - - - public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException; - - public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException; - - public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ; - - public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException; - - public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException; - - public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException; - - public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java deleted file mode 100644 index ff26c6adf5..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.QueueUnbindBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPQueue -{ - - /** - * ----------------------------------------------- - * API Methods - * ----------------------------------------------- - */ - public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException; - - public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException; - - // Queue.unbind doesn't have nowait - public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException; - - public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException; - - public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException; - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java deleted file mode 100644 index 6d0e83bb7e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp; - -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.QpidConstants; - -public class AbstractAMQPClassFactory -{ - public static AMQPClassFactory getFactoryInstance() throws AMQPException - { - String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY); - try - { - return (AMQPClassFactory)Class.forName(className).newInstance(); - } - catch(Exception e) - { - throw new AMQPException("Error creating AMQPClassFactory",e); - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java deleted file mode 100644 index f682659f9e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.qpid.nclient.amqp.event; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPEventManager -{ - public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); - public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); - public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException; -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java deleted file mode 100644 index c6641890e0..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.apache.qpid.nclient.amqp.event; - -import org.apache.qpid.framing.AMQMethodBody; - -/** - * This class is exactly the same as the AMQMethod event. - * Except I renamed requestId to corelationId, so I could use it both ways. - * - * I didn't want to modify anything in common so that there is no - * impact on the existing code. - * - */ -public class AMQPMethodEvent<M extends AMQMethodBody> -{ - - private final M _method; - - private final int _channelId; - - /** - * This is the rquest id from the broker when it sent me a request - * when I respond I remember this id and copy this to the outgoing - * response. - */ - private final long _correlationId; - - /** - * I could use _correlationId, bcos when I send a request - * this field is blank and is only used internally. But I - * used a seperate field to make it more clear. - */ - private long _localCorrletionId = 0; - - public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId) - { - _channelId = channelId; - _method = method; - _correlationId = correlationId; - _localCorrletionId = localCorrletionId; - } - - public AMQPMethodEvent(int channelId, M method, long correlationId) - { - _channelId = channelId; - _method = method; - _correlationId = correlationId; - } - - public M getMethod() - { - return _method; - } - - public int getChannelId() - { - return _channelId; - } - - public long getCorrelationId() - { - return _correlationId; - } - - public long getLocalCorrelationId() - { - return _localCorrletionId; - } - - public String toString() - { - StringBuilder buf = new StringBuilder("Method event: \n"); - buf.append("Channel id: \n").append(_channelId); - buf.append("Method: \n").append(_method); - buf.append("Request Id: ").append(_correlationId); - buf.append("Local Correlation Id: ").append(_localCorrletionId); - return buf.toString(); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java deleted file mode 100644 index e77a38121c..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.qpid.nclient.amqp.event; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPMethodListener -{ - - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException; - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java deleted file mode 100644 index decb120796..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.nclient.amqp.qpid; - -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ChannelResumeBody; -import org.apache.qpid.nclient.amqp.AMQPChannel; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.amqp.state.AMQPState; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.util.AMQPValidator; - -/** - * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread - * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only - * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per - * channel by design. - * - * A JMS Session can wrap an instance of this class. - */ - -public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel -{ - private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class); - - // the channelId assigned for this channel - private int _channelId; - - private Phase _phase; - - private AMQPState _currentState; - - private AMQPStateManager _stateManager; - - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; - - private final AMQPState[] _validResumeStates = new AMQPState[] - { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - - private final Lock _lock = new ReentrantLock(); - - private final Condition _channelNotOpend = _lock.newCondition(); - - private final Condition _channelNotClosed = _lock.newCondition(); - - private final Condition _channelFlowNotResponded = _lock.newCondition(); - - private final Condition _channelNotResumed = _lock.newCondition(); - - private ChannelOpenOkBody _channelOpenOkBody; - - private ChannelCloseOkBody _channelCloseOkBody; - - private ChannelFlowOkBody _channelFlowOkBody; - - private ChannelOkBody _channelOkBody; - - private ChannelCloseBody _channelCloseBody; - - protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager) - { - _channelId = channelId; - _phase = phase; - _stateManager = stateManager; - _currentState = AMQPState.CHANNEL_NOT_OPENED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } - - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody) - */ - public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException - { - _lock.lock(); - try - { - _channelOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotOpend.await(); - checkIfChannelClosed(); - AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); - notifyState(AMQPState.CHANNEL_OPENED); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOpenOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.open", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody) - */ - public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException - { - _lock.lock(); - try - { - _channelCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotClosed.await(); - AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); - notifyState(AMQPState.CHANNEL_CLOSED); - _currentState = AMQPState.CHANNEL_CLOSED; - return _channelCloseOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.close", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody) - */ - public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException - { - _lock.lock(); - try - { - _channelFlowOkBody = null; - if (channelFlowBody.active) - { - checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); - } - else - { - checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); - } - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelFlowNotResponded.await(); - checkIfChannelClosed(); - AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); - handleChannelFlowState(_channelFlowOkBody.active); - return _channelFlowOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.flow", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody) - */ - public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException - { - _lock.lock(); - try - { - _channelOkBody = null; - checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _channelNotResumed.await(); - checkIfChannelClosed(); - AMQPValidator.throwExceptionOnNull(_channelOkBody, - "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); - notifyState(AMQPState.CHANNEL_OPENED); - _currentState = AMQPState.CHANNEL_OPENED; - return _channelOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in channel.resume", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * ------------------------------------------- - * AMQPMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try - { - if (evt.getMethod() instanceof ChannelOpenOkBody) - { - _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); - _channelNotOpend.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseOkBody) - { - _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); - _channelNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelCloseBody) - { - _channelCloseBody = (ChannelCloseBody) evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleChannelClose(_channelCloseBody); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowOkBody) - { - _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); - _channelFlowNotResponded.signal(); - return true; - } - else if (evt.getMethod() instanceof ChannelFlowBody) - { - handleChannelFlow((ChannelFlowBody) evt.getMethod()); - return true; - } - else if (evt.getMethod() instanceof ChannelOkBody) - { - _channelOkBody = (ChannelOkBody) evt.getMethod(); - // In this case the only method expecting channel-ok is channel-resume - // haven't implemented ping and pong. - _channelNotResumed.signal(); - return true; - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - - private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException - { - notifyState(AMQPState.CHANNEL_CLOSED); - _currentState = AMQPState.CHANNEL_CLOSED; - // handle channel related cleanup - } - - private void releaseLocks() - { - if (_currentState == AMQPState.CHANNEL_NOT_OPENED) - { - _channelNotOpend.signal(); - _channelNotResumed.signal(); // It could be a channel.resume call - } - else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) - { - _channelFlowNotResponded.signal(); - } - else if (_currentState == AMQPState.CHANNEL_CLOSED) - { - _channelNotResumed.signal(); - } - } - - private void checkIfChannelClosed() throws AMQPException - { - if (_channelCloseBody != null) - { - String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code (" - + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method " - + _channelCloseBody.getMethod(); - - throw new AMQPException(error); - } - } - - private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException - { - _lock.lock(); - try - { - handleChannelFlowState(channelFlowBody.active); - } - finally - { - _lock.unlock(); - } - } - - private void handleChannelFlowState(boolean flow)throws AMQPException - { - notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND); - _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; - } - - private void notifyState(AMQPState newState) throws AMQPException - { - _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java deleted file mode 100644 index 809aa57dab..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOkBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageOffsetBody; -import org.apache.qpid.framing.MessageOkBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageQosBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageRejectBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.framing.QueueUnbindOkBody; -import org.apache.qpid.nclient.amqp.AMQPChannel; -import org.apache.qpid.nclient.amqp.AMQPClassFactory; -import org.apache.qpid.nclient.amqp.AMQPConnection; -import org.apache.qpid.nclient.amqp.AMQPExchange; -import org.apache.qpid.nclient.amqp.AMQPMessage; -import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; -import org.apache.qpid.nclient.amqp.AMQPQueue; -import org.apache.qpid.nclient.amqp.event.AMQPEventManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.DefaultPhaseContext; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.transport.AMQPConnectionURL; -import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.transport.TransportConnectionFactory; -import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; -import org.apache.qpid.url.URLSyntaxException; - -/** - * The Class Factory creates AMQP Class - * equivalents defined in the spec. - * - * There should one instance per connection. - * The factory class creates all the support - * classes and provides an instance of the - * AMQP class in ready-to-use state. - * - */ -public class QpidAMQPClassFactory implements AMQPClassFactory -{ - //Need an event manager per connection - private AMQPEventManager _eventManager = new QpidEventManager(); - - // Need a state manager per connection - private AMQPStateManager _stateManager = new QpidStateManager(); - - //Need a phase pipe per connection - private Phase _phase; - - //One instance per connection - private QpidAMQPConnection _amqpConnection; - - public QpidAMQPClassFactory() - { - - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) - */ - public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException - { - AMQPConnectionURL url = new AMQPConnectionURL(urlStr); - return createConnectionClass(url, type); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) - */ - public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException - { - if (_amqpConnection == null) - { - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); - - TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); - _amqpConnection = new QpidAMQPConnection(conn, _stateManager); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); - _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); - } - return _amqpConnection; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int) - */ - public AMQPChannel createChannelClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager); - _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - return amqpChannel; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel) - */ - public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); - _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int) - */ - public AMQPExchange createExchangeClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase); - _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - return amqpExchange; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange) - */ - public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); - _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int) - */ - public AMQPQueue createQueueClass(int channel) throws AMQPException - { - checkIfConnectionStarted(); - QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase); - _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - return amqpQueue; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue) - */ - public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); - _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack) - */ - public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException - { - checkIfConnectionStarted(); - QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb); - _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - - return amqpMessage; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage) - */ - public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException - { - _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); - _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); - } - - //This class should register as a state listener for AMQPConnection - private void checkIfConnectionStarted() throws AMQPException - { - if (_phase == null) - { - _phase = _amqpConnection.getPhasePipe(); - - if (_phase == null) - { - throw new AMQPException("Cannot create a channel until connection is ready"); - } - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager() - */ - public AMQPEventManager getEventManager() - { - return _eventManager; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager() - */ - public AMQPStateManager getStateManager() - { - return _stateManager; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java deleted file mode 100644 index 9b4d776cc5..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ConnectionCloseOkBody; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.nclient.amqp.AMQPConnection; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.amqp.state.AMQPState; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.transport.TransportConnection; -import org.apache.qpid.nclient.util.AMQPValidator; - -/** - * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is - * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an - * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. - */ -public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection -{ - private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class); - - private Phase _phase; - - private TransportConnection _connection; - - private long _correlationId; - - private AMQPState _currentState; - - private AMQPStateManager _stateManager; - - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, - AMQPState.CONNECTION_OPEN, }; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - - private final Lock _lock = new ReentrantLock(); - - private final Condition _connectionNotStarted = _lock.newCondition(); - - private final Condition _connectionNotSecure = _lock.newCondition(); - - private final Condition _connectionNotTuned = _lock.newCondition(); - - private final Condition _connectionNotOpened = _lock.newCondition(); - - private final Condition _connectionNotClosed = _lock.newCondition(); - - private ConnectionStartBody _connectionStartBody; - - private ConnectionSecureBody _connectionSecureBody; - - private ConnectionTuneBody _connectionTuneBody; - - private ConnectionOpenOkBody _connectionOpenOkBody; - - private ConnectionCloseOkBody _connectionCloseOkBody; - - private ConnectionCloseBody _connectionCloseBody; - - protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager) - { - _connection = connection; - _stateManager = stateManager; - _currentState = AMQPState.CONNECTION_UNDEFINED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } - - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection() - */ - public ConnectionStartBody openTCPConnection() throws AMQPException - { - _lock.lock(); - // open the TCP connection - try - { - _connectionStartBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); - _phase = _connection.connect(); - - // waiting for ConnectionStartBody or error in connection - //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotStarted.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); - notifyState(AMQPState.CONNECTION_NOT_STARTED); - _currentState = AMQPState.CONNECTION_NOT_STARTED; - return _connectionStartBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error opening connection to broker", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody) - */ - public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); - _phase.messageSent(msg); - // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); - _connectionNotSecure.await(); - - checkIfConnectionClosed(); - if (_connectionTuneBody != null) - { - notifyState(AMQPState.CONNECTION_NOT_TUNED); - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - notifyState(AMQPState.CONNECTION_NOT_SECURE); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.startOk", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody) - */ - public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionTuneBody = null; - _connectionSecureBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); - - _connectionSecureBody = null; // The server could send a fresh challenge - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); - _phase.messageSent(msg); - - //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotTuned.await(); - checkIfConnectionClosed(); - - if (_connectionTuneBody != null) - { - notifyState(AMQPState.CONNECTION_NOT_TUNED); - _currentState = AMQPState.CONNECTION_NOT_TUNED; - return _connectionTuneBody; - } - else if (_connectionSecureBody != null) - { // oops the server sent another challenge - notifyState(AMQPState.CONNECTION_NOT_SECURE); - _currentState = AMQPState.CONNECTION_NOT_SECURE; - return _connectionSecureBody; - } - else - { - throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); - } - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.secureOk", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody) - */ - public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException - { - _lock.lock(); - try - { - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); - _connectionSecureBody = null; - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); - _phase.messageSent(msg); - notifyState(AMQPState.CONNECTION_NOT_OPENED); - _currentState = AMQPState.CONNECTION_NOT_OPENED; - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody) - */ - public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException - { - _lock.lock(); - try - { - // If the broker sends a connection close due to an error with the - // Connection tune ok, then this call will verify that - checkIfConnectionClosed(); - - _connectionOpenOkBody = null; - checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _connectionNotOpened.await(); - - checkIfConnectionClosed(); - AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); - notifyState(AMQPState.CONNECTION_OPEN); - _currentState = AMQPState.CONNECTION_OPEN; - return _connectionOpenOkBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.open", e); - } - finally - { - _lock.unlock(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody) - */ - public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException - { - _lock.lock(); - try - { - _connectionCloseOkBody = null; - checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); - AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); - AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); - notifyState(AMQPState.CONNECTION_CLOSED); - _currentState = AMQPState.CONNECTION_CLOSED; - return _connectionCloseOkBody; - } - catch (InterruptedException e) - { - throw new AMQPException("Error in connection.close", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * ------------------------------------------- - * AMQMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try - { - _correlationId = evt.getCorrelationId(); - - if (evt.getMethod() instanceof ConnectionStartBody) - { - _connectionStartBody = (ConnectionStartBody) evt.getMethod(); - _connectionNotStarted.signalAll(); - return true; - } - else if (evt.getMethod() instanceof ConnectionSecureBody) - { - _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); - _connectionNotSecure.signal(); - _connectionNotTuned.signal(); // in case the server has sent another chanllenge - return true; - } - else if (evt.getMethod() instanceof ConnectionTuneBody) - { - _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); - _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk - _connectionNotTuned.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionOpenOkBody) - { - _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); - _connectionNotOpened.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseOkBody) - { - _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); - _connectionNotClosed.signal(); - return true; - } - else if (evt.getMethod() instanceof ConnectionCloseBody) - { - _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); - // release the correct lock as u may have some conditions waiting. - // while an error occured and the broker has sent a close. - releaseLocks(); - handleClose(); - return true; - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - - - public Phase getPhasePipe() - { - return _phase; - } - - private void handleClose() throws AMQPException - { - try - { - notifyState(AMQPState.CONNECTION_CLOSING); - _currentState = AMQPState.CONNECTION_CLOSING; - // do the required cleanup and send a ConnectionCloseOkBody - } - catch (Exception e) - { - throw new AMQPException("Error handling connection.close from broker", e); - } - } - - private void checkIfConnectionClosed() throws AMQPException - { - if (_connectionCloseBody != null) - { - String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" - + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " - + _connectionCloseBody.getMethod(); - - throw new AMQPException(error); - } - } - - private void releaseLocks() - { - if (_currentState == AMQPState.CONNECTION_NOT_OPENED) - { - _connectionNotOpened.signal(); - } - else if (_currentState == AMQPState.CONNECTION_UNDEFINED) - { - _connectionNotStarted.signal(); - } - else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) - { - _connectionNotSecure.signal(); - } - else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) - { - _connectionNotTuned.signal(); - } - } - - private void notifyState(AMQPState newState) throws AMQPException - { - _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); - } - -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java deleted file mode 100644 index 70ea6bd27d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.nclient.amqp.qpid; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.DtxCoordinationCommitBody; -import org.apache.qpid.framing.DtxCoordinationCommitOkBody; -import org.apache.qpid.framing.DtxCoordinationForgetBody; -import org.apache.qpid.framing.DtxCoordinationForgetOkBody; -import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; -import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody; -import org.apache.qpid.framing.DtxCoordinationPrepareBody; -import org.apache.qpid.framing.DtxCoordinationPrepareOkBody; -import org.apache.qpid.framing.DtxCoordinationRecoverBody; -import org.apache.qpid.framing.DtxCoordinationRecoverOkBody; -import org.apache.qpid.framing.DtxCoordinationRollbackBody; -import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; -import org.apache.qpid.nclient.amqp.AMQPCallBack; -import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; -import org.apache.qpid.nclient.amqp.AMQPDtxCoordination; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination -{ - private Phase _phase; - - protected QpidAMQPDtxCoordination(int channelId,Phase phase) - { - super(channelId); - _phase = phase; - } - - public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb); - _phase.messageSent(msg); - } - - public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb); - _phase.messageSent(msg); - } - - public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb); - _phase.messageSent(msg); - } - - public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb); - _phase.messageSent(msg); - } - - public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb); - _phase.messageSent(msg); - } - - public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb); - _phase.messageSent(msg); - } - - public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb); - _phase.messageSent(msg); - } - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof DtxCoordinationCommitOkBody || - methodBody instanceof DtxCoordinationForgetOkBody || - methodBody instanceof DtxCoordinationGetTimeoutOkBody || - methodBody instanceof DtxCoordinationPrepareOkBody || - methodBody instanceof DtxCoordinationRecoverOkBody - ) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else - { - return false; - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java deleted file mode 100644 index fd7b6f8ec6..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.DtxDemarcationEndBody; -import org.apache.qpid.framing.DtxDemarcationEndOkBody; -import org.apache.qpid.framing.DtxDemarcationSelectBody; -import org.apache.qpid.framing.DtxDemarcationSelectOkBody; -import org.apache.qpid.framing.DtxDemarcationStartBody; -import org.apache.qpid.framing.DtxDemarcationStartOkBody; -import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.amqp.state.AMQPState; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.util.AMQPValidator; - -public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation -{ - private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class); - - // the channelId that will be used for transactions - private int _channelId; - - private Phase _phase; - - private AMQPState _currentState; - - private AMQPStateManager _stateManager; - - private final AMQPState[] _validEndStates = new AMQPState[] - { AMQPState.DTX_STARTED }; - - private final AMQPState[] _validStartStates = new AMQPState[] - { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END }; - - // The wait period until a server sends a respond - private long _serverTimeOut = 1000; - - private final Lock _lock = new ReentrantLock(); - - private final Condition _dtxNotSelected = _lock.newCondition(); - - private final Condition _dtxNotStarted = _lock.newCondition(); - - // maybe it needs a better name - private final Condition _dtxNotEnd = _lock.newCondition(); - - private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody; - - private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody; - - private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody; - - protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager) - { - _channelId = channelId; - _phase = phase; - _stateManager = stateManager; - _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED; - _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); - } - - /** - * ------------------------------------------- - * API Methods - * -------------------------------------------- - */ - public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException - { - _lock.lock(); - try - { - _dtxDemarcationSelectOkBody = null; - checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _dtxNotSelected.await(); - AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time"); - notifyState(AMQPState.DTX_NOT_STARTED); - _currentState = AMQPState.DTX_NOT_STARTED; - return _dtxDemarcationSelectOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in dtx.select", e); - } - finally - { - _lock.unlock(); - } - } - - public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException - { - _lock.lock(); - try - { - _dtxDemarcationStartOkBody = null; - checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _dtxNotStarted.await(); - AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time"); - notifyState(AMQPState.DTX_STARTED); - _currentState = AMQPState.DTX_STARTED; - return _dtxDemarcationStartOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in dtx.start", e); - } - finally - { - _lock.unlock(); - } - } - - public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException - { - _lock.lock(); - try - { - _dtxDemarcationEndOkBody = null; - checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END); - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID); - _phase.messageSent(msg); - - //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS); - _dtxNotEnd.await(); - AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time"); - notifyState(AMQPState.DTX_END); - _currentState = AMQPState.DTX_END; - return _dtxDemarcationEndOkBody; - } - catch (Exception e) - { - throw new AMQPException("Error in dtx.start", e); - } - finally - { - _lock.unlock(); - } - } - - /** - * ------------------------------------------- - * AMQPMethodListener methods - * -------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - _lock.lock(); - try - { - if (evt.getMethod() instanceof DtxDemarcationSelectOkBody) - { - _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); - _dtxNotSelected.signal(); - return true; - } - else if (evt.getMethod() instanceof DtxDemarcationStartOkBody) - { - _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod(); - _dtxNotStarted.signal(); - return true; - } - else if (evt.getMethod() instanceof DtxDemarcationEndOkBody) - { - _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); - _dtxNotEnd.signal(); - return true; - } - else - { - return false; - } - } - finally - { - _lock.unlock(); - } - } - - private void notifyState(AMQPState newState) throws AMQPException - { - _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java deleted file mode 100644 index 02b22e0755..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.nclient.amqp.qpid; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.ExchangeDeleteBody; -import org.apache.qpid.framing.ExchangeDeleteOkBody; -import org.apache.qpid.nclient.amqp.AMQPCallBack; -import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; -import org.apache.qpid.nclient.amqp.AMQPExchange; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -/** - * - * This class represents the Exchange class defined in AMQP. - * Each method takes an @see AMQPCallBack object if it wants to know - * the response from the broker to particular method. - * Clients can handle the reponse asynchronously or block for a response - * using AMQPCallBack.isComplete() periodically using a loop. - */ -public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange -{ - private Phase _phase; - - protected QpidAMQPExchange(int channelId,Phase phase) - { - super(channelId); - _phase = phase; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); - _phase.messageSent(msg); - } - - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else - { - return false; - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java deleted file mode 100644 index e40c3cefa2..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageConsumeBody; -import org.apache.qpid.framing.MessageEmptyBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageOffsetBody; -import org.apache.qpid.framing.MessageOkBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageQosBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageRejectBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.nclient.amqp.AMQPCallBack; -import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; -import org.apache.qpid.nclient.amqp.AMQPMessage; -import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -/** - * This class represents the AMQP Message class. - * You need an instance of this class per channel. - * A @see AMQPMessageCallBack class is taken as an argument in the constructor. - * A client can use this class to issue Message class methods on the broker. - * When the broker issues Message class methods on the client, the client is notified - * via the AMQPMessageCallBack interface. - * - * A JMS Message producer implementation can wrap an instance if this and map - * JMS method calls to the appropriate AMQP methods. - * - * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. - * - */ -public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage -{ - private Phase _phase; - private AMQPMessageCallBack _messageCb; - - protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) - { - super(channelId); - _phase = phase; - _messageCb = messageCb; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - - public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long) - */ - public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long) - */ - public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long) - */ - public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException - { - AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); - _phase.messageSent(msg); - } - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof MessageOkBody || - methodBody instanceof MessageRejectBody || - methodBody instanceof MessageEmptyBody) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else if (methodBody instanceof MessageTransferBody) - { - _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageAppendBody) - { - _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageOpenBody) - { - _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageCloseBody) - { - _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageCheckpointBody) - { - _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageRecoverBody) - { - _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); - return true; - } - else if (methodBody instanceof MessageResumeBody) - { - _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); - return true; - } - else - { - return false; - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java deleted file mode 100644 index 323ff0cf06..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.framing.QueueUnbindBody; -import org.apache.qpid.framing.QueueUnbindOkBody; -import org.apache.qpid.nclient.amqp.AMQPCallBack; -import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; -import org.apache.qpid.nclient.amqp.AMQPQueue; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -/** - * - * This class represents the Queue class defined in AMQP. - * Each method takes an @see AMQPCallBack object if it wants to know - * the response from the broker to a particular method. - * Clients can handle the reponse asynchronously or block for a response - * using AMQPCallBack.isComplete() periodically using a loop. - */ -public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue -{ - private Phase _phase; - - protected QpidAMQPQueue(int channelId,Phase phase) - { - super(channelId); - _phase = phase; - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); - _phase.messageSent(msg); - } - - // Queue.unbind doesn't have nowait - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); - _phase.messageSent(msg); - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) - */ - public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException - { - AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); - _phase.messageSent(msg); - } - - - /**------------------------------------------- - * AMQPMethodListener methods - *-------------------------------------------- - */ - public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException - { - long localCorrelationId = evt.getLocalCorrelationId(); - AMQMethodBody methodBody = evt.getMethod(); - if ( methodBody instanceof QueueDeclareOkBody || - methodBody instanceof QueueBindOkBody || - methodBody instanceof QueueUnbindOkBody || - methodBody instanceof QueuePurgeOkBody || - methodBody instanceof QueueDeleteOkBody - ) - { - invokeCallBack(localCorrelationId,methodBody); - return true; - } - else - { - return false; - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java deleted file mode 100644 index 902c80fe77..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.nclient.amqp.event.AMQPEventManager; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; -import org.apache.qpid.nclient.core.AMQPException; - -/** - * This class registeres with the ModelPhase as a AMQMethodListener, - * to receive method events and then it distributes methods to other listerners - * using a filtering criteria. The criteria is channel id and method body class. - * The method listeners are added and removed dynamically - * - * <p/> - */ -public class QpidEventManager implements AMQPEventManager -{ - private static final Logger _logger = Logger.getLogger(QpidEventManager.class); - - private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>(); - - /** - * ------------------------------------------------ - * methods introduced by AMQEventManager - * ------------------------------------------------ - */ - public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) - { - Map<Class, List> _methodListenerMap; - if (_channelMap.containsKey(channelId)) - { - _methodListenerMap = _channelMap.get(channelId); - - } - else - { - _methodListenerMap = new ConcurrentHashMap<Class, List>(); - _channelMap.put(channelId, _methodListenerMap); - } - - List<AMQPMethodListener> _listeners; - if (_methodListenerMap.containsKey(clazz)) - { - _listeners = _methodListenerMap.get(clazz); - } - else - { - _listeners = new ArrayList<AMQPMethodListener>(); - _methodListenerMap.put(clazz, _listeners); - } - - _listeners.add(l); - - } - - public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) - { - if (_channelMap.containsKey(channelId)) - { - Map<Class, List> _methodListenerMap = _channelMap.get(channelId); - - if (_methodListenerMap.containsKey(clazz)) - { - List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz); - _listeners.remove(l); - } - - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) - */ - public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException - { - if (_channelMap.containsKey(evt.getChannelId())) - { - Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId()); - - if (_methodListenerMap.containsKey(evt.getMethod().getClass())) - { - - List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass()); - for (AMQPMethodListener l : _listeners) - { - l.methodReceived(evt); - } - - return (_listeners.size() > 0); - } - - } - - return false; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java deleted file mode 100644 index 3fe1ee4cfd..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.qpid; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateListener; -import org.apache.qpid.nclient.amqp.state.AMQPStateManager; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.core.AMQPException; - -public class QpidStateManager implements AMQPStateManager -{ - - private static final Logger _logger = Logger.getLogger(QpidStateManager.class); - - private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>(); - - public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException - { - List<AMQPStateListener> list; - if(_listernerMap.containsKey(stateType)) - { - list = _listernerMap.get(stateType); - } - else - { - list = new ArrayList<AMQPStateListener>(); - _listernerMap.put(stateType, list); - } - list.add(l); - } - - public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException - { - if(_listernerMap.containsKey(stateType)) - { - List<AMQPStateListener> list = _listernerMap.get(stateType); - list.remove(l); - } - } - - public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException - { - - if(_listernerMap.containsKey(event.getStateType())) - { - List<AMQPStateListener> list = _listernerMap.get(event.getStateType()); - for(AMQPStateListener l: list) - { - l.stateChanged(event); - } - } - else - { - _logger.warn("There are no registered listerners for state type" + event.getStateType()); - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java deleted file mode 100644 index 31a0197ab1..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.sample; - -import org.apache.qpid.framing.MessageAppendBody; -import org.apache.qpid.framing.MessageCheckpointBody; -import org.apache.qpid.framing.MessageCloseBody; -import org.apache.qpid.framing.MessageOpenBody; -import org.apache.qpid.framing.MessageRecoverBody; -import org.apache.qpid.framing.MessageResumeBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; -import org.apache.qpid.nclient.core.AMQPException; - -public class MessageHelper implements AMQPMessageCallBack -{ - - public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException - { - // TODO Auto-generated method stub - - } - - public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException - { - System.out.println("The Broker has sent a message" + messageTransferBody.toString()); - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java deleted file mode 100644 index 908f0adee0..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.sample; - -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.StringTokenizer; - -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.security.AMQPCallbackHandler; -import org.apache.qpid.nclient.security.CallbackHandlerRegistry; -import org.apache.qpid.nclient.transport.ConnectionURL; - -public class SecurityHelper -{ - public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException - { - final String mechanisms = new String(availableMechanisms, "utf8"); - StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); - HashSet mechanismSet = new HashSet(); - while (tokenizer.hasMoreTokens()) - { - mechanismSet.add(tokenizer.nextToken()); - } - - String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); - StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); - while (prefTokenizer.hasMoreTokens()) - { - String mech = prefTokenizer.nextToken(); - if (mechanismSet.contains(mech)) - { - return mech; - } - } - return null; - } - - public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) - throws AMQPException - { - Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); - try - { - Object instance = mechanismClass.newInstance(); - AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; - cbh.initialise(url); - return cbh; - } - catch (Exception e) - { - throw new AMQPException("Unable to create callback handler: " + e, e); - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java deleted file mode 100644 index 04714d7278..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.sample; - -import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; -import org.apache.qpid.nclient.amqp.state.AMQPStateListener; -import org.apache.qpid.nclient.core.AMQPException; - -public class StateHelper implements AMQPStateListener -{ - - public void stateChanged(AMQPStateChangedEvent event) throws AMQPException - { - String s = event.getStateType() + " changed state from " + - event.getOldState() + " to " + event.getNewState(); - - System.out.println(s); - - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java deleted file mode 100644 index 3dce1cde1e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.sample; - -import java.util.StringTokenizer; -import java.util.UUID; - -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; - -import org.apache.qpid.AMQException; -import org.apache.qpid.common.ClientProperties; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ChannelOpenBody; -import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.ConnectionOpenBody; -import org.apache.qpid.framing.ConnectionOpenOkBody; -import org.apache.qpid.framing.ConnectionSecureBody; -import org.apache.qpid.framing.ConnectionSecureOkBody; -import org.apache.qpid.framing.ConnectionStartBody; -import org.apache.qpid.framing.ConnectionStartOkBody; -import org.apache.qpid.framing.ConnectionTuneBody; -import org.apache.qpid.framing.ConnectionTuneOkBody; -import org.apache.qpid.framing.Content; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.MessageCancelBody; -import org.apache.qpid.framing.MessageConsumeBody; -import org.apache.qpid.framing.MessageGetBody; -import org.apache.qpid.framing.MessageTransferBody; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.QueuePurgeBody; -import org.apache.qpid.framing.QueuePurgeOkBody; -import org.apache.qpid.nclient.amqp.AMQPCallBack; -import org.apache.qpid.nclient.amqp.AMQPChannel; -import org.apache.qpid.nclient.amqp.AMQPClassFactory; -import org.apache.qpid.nclient.amqp.AMQPConnection; -import org.apache.qpid.nclient.amqp.AMQPExchange; -import org.apache.qpid.nclient.amqp.AMQPMessage; -import org.apache.qpid.nclient.amqp.AMQPQueue; -import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory; -import org.apache.qpid.nclient.amqp.state.AMQPStateType; -import org.apache.qpid.nclient.transport.AMQPConnectionURL; -import org.apache.qpid.nclient.transport.ConnectionURL; -import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; - -/** - * This class illustrates the usage of the API - * Notes this is just a simple demo. - * - * I have used Helper classes to keep the code cleaner. - * Will break this into unit tests later on - */ - -@SuppressWarnings("unused") -public class TestClient -{ - private byte _major; - - private byte _minor; - - private ConnectionURL _url; - - private static int _channel = 2; - - // Need a Class factory per connection - private AMQPClassFactory _classFactory; - - private int _ticket; - - public AMQPConnection openConnection() throws Exception - { - _classFactory = AbstractAMQPClassFactory.getFactoryInstance(); - - //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'"); - - _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); - return _classFactory.createConnectionClass(_url, ConnectionType.TCP); - } - - public void handleConnectionNegotiation(AMQPConnection con) throws Exception - { - StateHelper stateHelper = new StateHelper(); - _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper); - _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper); - - //ConnectionStartBody - ConnectionStartBody connectionStartBody = con.openTCPConnection(); - _major = connectionStartBody.getMajor(); - _minor = connectionStartBody.getMinor(); - - FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id - - final String locales = new String(connectionStartBody.getLocales(), "utf8"); - final StringTokenizer tokenizer = new StringTokenizer(locales, " "); - - final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); - - SaslClient sc = Sasl.createSaslClient(new String[] - { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url)); - - ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString( - tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); - // ConnectionSecureBody - AMQMethodBody body = con.startOk(connectionStartOkBody); - ConnectionTuneBody connectionTuneBody; - - if (body instanceof ConnectionSecureBody) - { - ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body; - ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc - .evaluateChallenge(connectionSecureBody.getChallenge())); - //Assuming the server is not going to send another challenge - connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody); - - } - else - { - connectionTuneBody = (ConnectionTuneBody) body; - } - - // Using broker supplied values - ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(), - connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); - con.tuneOk(connectionTuneOkBody); - - ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url - .getVirtualHost())); - - ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); - } - - public void handleChannelNegotiation() throws Exception - { - AMQPChannel channel = _classFactory.createChannelClass(_channel); - - ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); - ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); - - //lets have some fun - ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); - - ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); - - channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); - channelFlowOkBody = channel.flow(channelFlowBody); - System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); - } - - public void createExchange() throws Exception - { - AMQPExchange exchange = _classFactory.createExchangeClass(_channel); - - ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments - false,//auto delete - false,// durable - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal - false,// nowait - false,// passive - _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); - - AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); - exchange.declare(exchangeDeclareBody, cb); - // Blocking for response - while (!cb.isComplete()) - { - } - } - - public void createAndBindQueue() throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments - false,//auto delete - false,// durable - false, //exclusive, - false, //nowait, - false, //passive, - new AMQShortString("MyTestQueue"), 0); - - AMQPCallBack cb = new AMQPCallBack() - { - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body; - System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count " - + queueDeclareOkBody.getConsumerCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.declare(queueDeclareBody, cb); - //Blocking for response - while (!cb.isComplete()) - { - } - - QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange - false, //nowait - new AMQShortString("MyTestQueue"), //queue - new AMQShortString("RH"), //routingKey - 0 //ticket - ); - - cb = createCallBackWithMessage("Broker has bound the queue"); - queue.bind(queueBindBody, cb); - //Blocking for response - while (!cb.isComplete()) - { - } - } - - public void purgeQueue() throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack() - { - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body; - System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.purge(queuePurgeBody, cb); - //Blocking for response - while (!cb.isComplete()) - { - } - - } - - public void deleteQueue() throws Exception - { - AMQPQueue queue = _classFactory.createQueueClass(_channel); - - QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty - false, //ifUnused - false, //nowait - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = new AMQPCallBack() - { - - @Override - public void brokerResponded(AMQMethodBody body) - { - QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body; - System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - queue.delete(queueDeleteBody, cb); - //Blocking for response - while (!cb.isComplete()) - { - } - - } - - public void publishAndSubscribe() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); - MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination - false, //exclusive - null, //filter - false, //noAck, - false, //noLocal, - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); - message.consume(messageConsumeBody, cb); - //Blocking for response - while (!cb.isComplete()) - { - } - - // Sending 5 messages serially - for (int i = 0; i < 5; i++) - { - cb = createCallBackWithMessage("Broker has accepted msg " + i); - message.transfer(createMessages("Test" + i), cb); - while (!cb.isComplete()) - { - } - } - - MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); - - AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); - message.cancel(messageCancelBody, cb2); - - } - - private MessageTransferBody createMessages(String content) throws Exception - { - FieldTable headers = FieldTableFactory.newFieldTable(); - headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); - - MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId - headers, //applicationHeaders - new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body - new AMQShortString(""), //contentEncoding, - new AMQShortString("text/plain"), //contentType - new AMQShortString("testApp"), //correlationId - (short) 1, //deliveryMode non persistant - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination - new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange - 0l, //expiration - false, //immediate - false, //mandatory - new AMQShortString(UUID.randomUUID().toString()), //messageId - (short) 0, //priority - false, //redelivered - new AMQShortString("RH"), //replyTo - new AMQShortString("RH"), //routingKey, - "abc".getBytes(), //securityToken - 0, //ticket - System.currentTimeMillis(), //timestamp - new AMQShortString(""), //transactionId - 0l, //ttl, - new AMQShortString("Hello") //userId - ); - - return messageTransferBody; - - } - - public void publishAndGet() throws Exception - { - AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); - AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); - - MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck - new AMQShortString("MyTestQueue"), //queue - 0 //ticket - ); - - //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); - message.transfer(createMessages("Test"), cb); - while (!cb.isComplete()) - { - } - - cb = createCallBackWithMessage("Broker has accepted get"); - message.get(messageGetBody, cb); - } - - // Creates a gneric call back and prints the given message - private AMQPCallBack createCallBackWithMessage(final String msg) - { - AMQPCallBack cb = new AMQPCallBack() - { - - @Override - public void brokerResponded(AMQMethodBody body) - { - System.out.println(msg); - } - - @Override - public void brokerRespondedWithError(AMQException e) - { - } - - }; - - return cb; - } - - public static void main(String[] args) - { - TestClient test = new TestClient(); - try - { - AMQPConnection con = test.openConnection(); - test.handleConnectionNegotiation(con); - test.handleChannelNegotiation(); - test.createExchange(); - test.createAndBindQueue(); - test.publishAndSubscribe(); - test.purgeQueue(); - test.publishAndGet(); - test.deleteQueue(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java deleted file mode 100644 index 18219abc46..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.state; - -/** - * States used in the AMQ protocol. Used by the finite state machine to determine - * valid responses. - */ -public class AMQPState -{ - private final int _id; - - private final String _name; - - private AMQPState(int id, String name) - { - _id = id; - _name = name; - } - - public String toString() - { - //return "AMQState: id = " + _id + " name: " + _name; - return _name; // looks better with loggin - } - - // Connection state - public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED"); - public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED"); - public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE"); - public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED"); - public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED"); - public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN"); - public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING"); - public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED"); - - // Channel state - public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED"); - public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); - public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); - public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); - - // Distributed Transaction state - public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED"); - public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED"); - public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED"); - public static final AMQPState DTX_END = new AMQPState(10, "DTX_END"); -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java deleted file mode 100644 index 506d267a9e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.state; - -public class AMQPStateChangedEvent -{ - private AMQPState _oldState; - - private AMQPState _newState; - - private AMQPStateType _stateType; - - public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType) - { - _oldState = oldState; - _newState = newState; - _stateType = stateType; - } - - public AMQPState getNewState() - { - return _newState; - } - - public void setNewState(AMQPState newState) - { - this._newState = newState; - } - - public AMQPState getOldState() - { - return _oldState; - } - - public void setOldState(AMQPState oldState) - { - this._oldState = oldState; - } - - public AMQPStateType getStateType() - { - return _stateType; - } - - public void setStateType(AMQPStateType stateType) - { - this._stateType = stateType; - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java deleted file mode 100644 index 974f707504..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.qpid.nclient.amqp.state; - -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPStateListener -{ - public void stateChanged(AMQPStateChangedEvent event) throws AMQPException; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java deleted file mode 100644 index c1fde7181d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.qpid.nclient.amqp.state; - -import org.apache.qpid.nclient.core.AMQPException; - -public class AMQPStateMachine -{ - protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException - { - if (currentState != correctState) - { - throw new IllegalStateTransitionException(currentState,requiredState); - } - } - - protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException - { - for(AMQPState correctState :correctStates) - { - if (currentState == correctState) - { - return; - } - } - throw new IllegalStateTransitionException(currentState,requiredState); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java deleted file mode 100644 index 9bc60b658e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.qpid.nclient.amqp.state; - -import org.apache.qpid.AMQException; -import org.apache.qpid.nclient.core.AMQPException; - -public interface AMQPStateManager -{ - public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; - - public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; - - public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException; -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java deleted file mode 100644 index e190d639f2..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.state; - -/** - * The Type of States used in the AMQ protocol. - * This allows to partition listeners by the type of states they want - * to listen rather than all. - * For example an Object might only be interested in Channel state - */ -public class AMQPStateType -{ - private final int _typeId; - - private final String _typeName; - - private AMQPStateType(int id, String name) - { - _typeId = id; - _typeName = name; - } - - public String toString() - { - return _typeName; - } - - // Connection state - public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE"); - public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE"); - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java deleted file mode 100644 index fdc24d1d2f..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.amqp.state; - -import org.apache.qpid.nclient.core.AMQPException; - -public class IllegalStateTransitionException extends AMQPException -{ - private AMQPState _currentState; - private AMQPState _desiredState; - - public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState) - { - super("No valid state transition defined from state " + currentState + - " to state " + desiredState); - _currentState = currentState; - _desiredState = desiredState; - } - - public AMQPState getCurrentState() - { - return _currentState; - } - - public AMQPState getDesiredState() - { - return _desiredState; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java deleted file mode 100644 index 7bc77e02c0..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java +++ /dev/null @@ -1,100 +0,0 @@ -package org.apache.qpid.nclient.config; - -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Collection; -import java.util.List; -import java.util.TreeMap; - -import javax.security.sasl.SaslClientFactory; - -import org.apache.commons.configuration.CombinedConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.SystemConfiguration; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.log4j.Logger; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.nclient.security.AMQPCallbackHandler; - -/** - * Loads a properties file from classpath. - * These values can be overwritten using system properties - */ -public class ClientConfiguration extends CombinedConfiguration { - - private static final Logger _logger = Logger.getLogger(ClientConfiguration.class); - private static ClientConfiguration _instance = new ClientConfiguration(); - - ClientConfiguration() - { - super(); - addConfiguration(new SystemConfiguration()); - try - { - XMLConfiguration config = new XMLConfiguration(); - config.load(getInputStream()); - addConfiguration(config); - } - catch (ConfigurationException e) - { - _logger.warn("Client Properties missing, using defaults",e); - } - } - - public static ClientConfiguration get() - { - return _instance; - } - - private InputStream getInputStream() - { - if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null) - { - try - { - return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH)); - } - catch(Exception e) - { - return this.getClass().getResourceAsStream("client.xml"); - } - } - else - { - return this.getClass().getResourceAsStream("client.xml"); - } - - } - - public static void main(String[] args) - { - String key = QpidConstants.AMQP_SECURITY + "." + - QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." + - QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY; - - TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = - new TreeMap<String, Class<? extends SaslClientFactory>>(); - - int index = ClientConfiguration.get().getMaxIndex(key); - - for (int i=0; i<index+1;i++) - { - String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); - String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); - try - { - Class<?> clazz = Class.forName(className); - if (!(SaslClientFactory.class.isAssignableFrom(clazz))) - { - _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); - continue; - } - factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); - } - catch (Exception ex) - { - _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); - } - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml deleted file mode 100644 index 6eeedbe1ff..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml +++ /dev/null @@ -1,37 +0,0 @@ -<?xml version="1.0" encoding="ISO-8859-1" ?> -<qpidClientConfig> - - <security> - <saslClientFactoryTypes> - <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory> - </saslClientFactoryTypes> - <securityMechanisms> - <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> - <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> - </securityMechanisms> - </security> - - <!-- Transport Layer properties --> - <useSharedReadWritePool>false</useSharedReadWritePool> - <enableDirectBuffers>true</enableDirectBuffers> - <enablePooledAllocator>false</enablePooledAllocator> - <tcpNoDelay>true</tcpNoDelay> - <sendBufferSizeInKb>32</sendBufferSizeInKb> - <reciveBufferSizeInKb>32</reciveBufferSizeInKb> - <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass> - - <!-- Execution Layer properties --> - <maxAccumilatedResponses>20</maxAccumilatedResponses> - - <!-- Model Phase properties --> - <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds> - <amqpClassFactory>org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory</amqpClassFactory> - <maxAccumilatedResponses>20</maxAccumilatedResponses> - - <phasePipe> - <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase> - <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase> - <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase> - </phasePipe> - -</qpidClientConfig>
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java deleted file mode 100644 index a029f7d4ff..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java +++ /dev/null @@ -1,55 +0,0 @@ -package org.apache.qpid.nclient.core; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; - - -public class AMQPException extends Exception -{ - private int _errorCode; - - public AMQPException(String message) - { - super(message); - } - - public AMQPException(String msg, Throwable t) - { - super(msg, t); - } - - public AMQPException(int errorCode, String msg, Throwable t) - { - super(msg + " [error code " + errorCode + ']', t); - _errorCode = errorCode; - } - - public AMQPException(int errorCode, String msg) - { - super(msg + " [error code " + errorCode + ']'); - _errorCode = errorCode; - } - - public AMQPException(Logger logger, String msg, Throwable t) - { - this(msg, t); - logger.error(getMessage(), this); - } - - public AMQPException(Logger logger, String msg) - { - this(msg); - logger.error(getMessage(), this); - } - - public AMQPException(Logger logger, int errorCode, String msg) - { - this(errorCode, msg); - logger.error(getMessage(), this); - } - - public int getErrorCode() - { - return _errorCode; - } - } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java deleted file mode 100644 index bf6a19b920..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.core; - -public abstract class AbstractPhase implements Phase { - - protected PhaseContext _ctx; - protected Phase _nextInFlowPhase; - protected Phase _nextOutFlowPhase; - - - /** - * ------------------------------------------------ - * Phase - method introduced by Phase - * ------------------------------------------------ - */ - public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) { - _nextInFlowPhase = nextInFlowPhase; - _nextOutFlowPhase = nextOutFlowPhase; - _ctx = ctx; - } - - /** - * The start is called from the top - * of the pipe and is propogated the - * bottom. - * - * Each phase can override this to do - * any phase specific logic related - * pipe.start() - */ - public void start()throws AMQPException - { - if(_nextOutFlowPhase != null) - { - _nextOutFlowPhase.start(); - } - } - - /** - * Each phase can override this to do - * any phase specific cleanup - */ - public void close()throws AMQPException - { - - } - - public void messageReceived(Object frame) throws AMQPException - { - if(_nextInFlowPhase != null) - { - _nextInFlowPhase.messageReceived(frame); - } - } - - public void messageSent(Object frame) throws AMQPException - { - if (_nextOutFlowPhase != null) - { - _nextOutFlowPhase.messageSent(frame); - } - } - - public PhaseContext getPhaseContext() - { - return _ctx; - } - - public Phase getNextInFlowPhase() { - return _nextInFlowPhase; - } - - public Phase getNextOutFlowPhase() { - return _nextOutFlowPhase; - } - - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java deleted file mode 100644 index a3455cdacd..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.qpid.nclient.core; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class DefaultPhaseContext implements PhaseContext -{ - public Map<String,Object> _props = new ConcurrentHashMap<String,Object>(); - - public Object getProperty(String name) - { - return _props.get(name); - } - - public void setProperty(String name, Object value) - { - _props.put(name, value); - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java deleted file mode 100644 index 7aa10b77ff..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.qpid.nclient.core; - - -public interface Phase -{ - - /** - * This method is used to initialize a phase - * - * @param ctx - * @param nextInFlowPhase - * @param nextOutFlowPhase - */ - public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase); - - /** - * - * Implement logic related to physical opening - * of the pipe - */ - public void start()throws AMQPException; - - /** - * Implement cleanup in this method. - * This indicates the pipe is closing - */ - public void close()throws AMQPException; - - public void messageReceived(Object msg) throws AMQPException; - - public void messageSent(Object msg) throws AMQPException; - - public PhaseContext getPhaseContext(); - - public Phase getNextOutFlowPhase(); - - public Phase getNextInFlowPhase(); -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java deleted file mode 100644 index d5942fd785..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.core; - -/** - * This can be thought of as a session context associated - * with the pipe. This is transient and is scoped by the - * duration of the physical connection. - * - */ -public interface PhaseContext { - - public Object getProperty(String name); - - public void setProperty(String name, Object value); - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java deleted file mode 100644 index 9542aab344..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.qpid.nclient.core; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.nclient.config.ClientConfiguration; - -public class PhaseFactory -{ - /** - * This method will create the pipe and return a reference - * to the top of the pipeline. - * - * The application can then use this (top most) phase and all - * calls will propogated down the pipe. - * - * Simillar calls orginating at the bottom of the pipeline - * will be propogated to the top. - * - * @param ctx - * @return - * @throws AMQPException - */ - public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException - { - String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE; - Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); - List<String> list = ClientConfiguration.get().getList(key); - int index = 0; - for(String s:list) - { - try - { - Phase temp = (Phase)Class.forName(s).newInstance(); - phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ; - } - catch(Exception e) - { - throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); - } - index++; - } - - Phase current = null; - Phase prev = null; - Phase next = null; - //Lets build the phase pipe. - for (int i=0; i<phaseMap.size();i++) - { - current = phaseMap.get(i); - if (i+1 < phaseMap.size()) - { - next = phaseMap.get(i+1); - } - current.init(ctx, next, prev); - prev = current; - next = null; - } - - return current; - } -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java deleted file mode 100644 index 034fc28070..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.qpid.nclient.core; - -public interface QpidConstants -{ - - // Common properties - public static long EMPTY_CORRELATION_ID = -1; - - public static int CHANNEL_ZERO = 0; - - public static String CONFIG_FILE_PATH = "ConfigFilePath"; - - // Phase Context properties - public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS"; - - public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR"; - - public final static String EVENT_MANAGER = "EVENT_MANAGER"; - - /**--------------------------------------------------------------- - * Configuration file properties - * ------------------------------------------------------------ - */ - - // Model Layer properties - public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds"; - public final static String AMQP_CLASS_FACTORY = "amqpClassFactory"; - - // MINA properties - public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool"; - - public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers"; - - public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator"; - - public final static String TCP_NO_DELAY = "tcpNoDelay"; - - public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb"; - - public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb"; - - // Security properties - public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes"; - - public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory"; - - public final static String TYPE = "[@type]"; - - public final static String AMQP_SECURITY = "security"; - - public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms"; - - public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler"; - - // Execution Layer properties - public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses"; - - //Transport Layer properties - public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass"; - - //Phase pipe properties - public final static String PHASE_PIPE = "phasePipe"; - - public final static String PHASE = "phase"; - - public final static String INDEX = "[@index]"; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java deleted file mode 100644 index cc1503d414..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.qpid.nclient.core; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class TransientPhaseContext implements PhaseContext { - - private Map map = new ConcurrentHashMap(); - - public Object getProperty(String name) { - return map.get(name); - } - - public void setProperty(String name, Object value) { - map.put(name, value); - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java deleted file mode 100644 index 1305500439..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java +++ /dev/null @@ -1,188 +0,0 @@ -package org.apache.qpid.nclient.execution; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQRequestBody; -import org.apache.qpid.framing.AMQResponseBody; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.core.QpidConstants; - -/** - * Corressponds to the Layer 2 in AMQP. - * This phase handles the correlation of amqp messages - * This class implements the 0.9 spec (request/response) - */ -public class ExecutionPhase extends AbstractPhase -{ - - protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class); - - protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); - - protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); - - /** - * -------------------------------------------------- - * Phase related methods - * -------------------------------------------------- - */ - - // should add these in the init method - //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); - //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); - public void messageReceived(Object msg) throws AMQPException - { - AMQFrame frame = (AMQFrame) msg; - final AMQBody bodyFrame = frame.getBodyFrame(); - - if (bodyFrame instanceof AMQRequestBody) - { - AMQPMethodEvent event; - try - { - event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame); - super.messageReceived(event); - } - catch (Exception e) - { - _logger.error("Error handling request", e); - } - - } - else if (bodyFrame instanceof AMQResponseBody) - { - List<AMQPMethodEvent> events; - try - { - events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame); - for (AMQPMethodEvent event : events) - { - super.messageReceived(event); - } - } - catch (Exception e) - { - _logger.error("Error handling response", e); - } - } - } - - /** - * Need to figure out if the message is a request or a response - * that needs to be sent and then delegate it to the Request or response manager - * to prepare it. - */ - public void messageSent(Object msg) throws AMQPException - { - AMQPMethodEvent evt = (AMQPMethodEvent) msg; - if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) - { - // This is a request - AMQFrame frame = handleRequest(evt); - super.messageSent(frame); - } - else - { - // This is a response - List<AMQFrame> frames = handleResponse(evt); - for (AMQFrame frame : frames) - { - super.messageSent(frame); - } - } - } - - /** - * ------------------------------------------------ - * Methods to handle request response - * ----------------------------------------------- - */ - private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Request frame received: " + requestBody); - } - - ResponseManager responseManager; - if(_channelId2ResponseMgrMap.containsKey(channelId)) - { - responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); - } - else - { - responseManager = new ResponseManager(0,channelId,false); - _channelId2ResponseMgrMap.put(channelId, responseManager); - } - return responseManager.requestReceived(requestBody); - } - - private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) - throws Exception - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Response frame received: " + responseBody); - } - - RequestManager requestManager; - if (_channelId2RequestMgrMap.containsKey(channelId)) - { - requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); - } - else - { - requestManager = new RequestManager(0,channelId,false); - _channelId2RequestMgrMap.put(channelId, requestManager); - } - - return requestManager.responseReceived(responseBody); - } - - private AMQFrame handleRequest(AMQPMethodEvent evt) - { - int channelId = evt.getChannelId(); - RequestManager requestManager; - if (_channelId2RequestMgrMap.containsKey(channelId)) - { - requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); - } - else - { - requestManager = new RequestManager(0,channelId,false); - _channelId2RequestMgrMap.put(channelId, requestManager); - } - return requestManager.sendRequest(evt); - } - - private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException - { - int channelId = evt.getChannelId(); - ResponseManager responseManager; - if(_channelId2ResponseMgrMap.containsKey(channelId)) - { - responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); - } - else - { - responseManager = new ResponseManager(0,channelId,false); - _channelId2ResponseMgrMap.put(channelId, responseManager); - } - try - { - return responseManager.sendResponse(evt); - } - catch (Exception e) - { - throw new AMQPException("Error handling response", e); - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java deleted file mode 100644 index 0084f27717..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.execution; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQRequestBody; -import org.apache.qpid.framing.AMQResponseBody; -import org.apache.qpid.framing.RequestResponseMappingException; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; - -public class RequestManager -{ - private static final Logger logger = Logger.getLogger(RequestManager.class); - - private int channel; - - /** - * Used for logging and debugging only - allows the context of this instance - * to be known. - */ - private boolean serverFlag; - private long connectionId; - - /** - * Request and response frames must have a requestID and responseID which - * indepenedently increment from 0 on a per-channel basis. These are the - * counters, and contain the value of the next (not yet used) frame. - */ - private long requestIdCount; - - /** - * These keep track of the last requestId and responseId to be received. - */ - private long lastProcessedResponseId; - - private ConcurrentHashMap<Long, CorrelationID> requestSentMap; - - public RequestManager(long connectionId, int channel, boolean serverFlag) - { - this.channel = channel; - this.serverFlag = serverFlag; - this.connectionId = connectionId; - requestIdCount = 1L; - lastProcessedResponseId = 0L; - requestSentMap = new ConcurrentHashMap<Long, CorrelationID>(); - } - - // *** Functions to originate a request *** - - public AMQFrame sendRequest(AMQPMethodEvent evt) - { - long requestId = getNextRequestId(); // Get new request ID - AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, - lastProcessedResponseId, evt.getMethod()); - if (logger.isDebugEnabled()) - { - logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + - "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); - } - requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId())); - return requestFrame; - } - - public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody) - throws Exception - { - long requestIdStart = responseBody.getRequestId(); - long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - if (logger.isDebugEnabled()) - { - logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + - responseBody + "; " + responseBody.getMethodPayload()); - } - - List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>(); - for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) - { - if (requestSentMap.get(requestId) == null) - { - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in requestSentMap."); - } - CorrelationID correlationID = requestSentMap.get(requestId); - AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), - correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID()); - events.add(methodEvent); - requestSentMap.remove(requestId); - } - lastProcessedResponseId = responseBody.getResponseId(); - return events; - } - - // *** Management functions *** - - public int requestsMapSize() - { - return requestSentMap.size(); - } - - // *** Private helper functions *** - - private long getNextRequestId() - { - return requestIdCount++; - } - - private class CorrelationID - { - // Use for the request/response stuff - private long _systemCorrelationID; - // used internally to track callbacks - private long _localCorrelationID; - - CorrelationID(long systemCorrelationID,long localCorrelationID) - { - _localCorrelationID = localCorrelationID; - _systemCorrelationID = systemCorrelationID; - } - - public long getLocalCorrelationID() - { - return _localCorrelationID; - } - - public void setLocalCorrelationID(long correlationID) - { - _localCorrelationID = correlationID; - } - - public long getSystemCorrelationID() - { - return _systemCorrelationID; - } - - public void setSystemCorrelationID(long correlationID) - { - _systemCorrelationID = correlationID; - } - - - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java deleted file mode 100644 index c5a75d242f..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.execution; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.log4j.Logger; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQRequestBody; -import org.apache.qpid.framing.AMQResponseBody; -import org.apache.qpid.framing.RequestResponseMappingException; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; - -public class ResponseManager -{ - private static final Logger logger = Logger.getLogger(ResponseManager.class); - - private int channel; - - /** - * Used for logging and debugging only - allows the context of this instance - * to be known. - */ - private boolean serverFlag; - private long connectionId; - - private int maxAccumulatedResponses = 20; // Default - - /** - * Request and response frames must have a requestID and responseID which - * indepenedently increment from 0 on a per-channel basis. These are the - * counters, and contain the value of the next (not yet used) frame. - */ - private long responseIdCount; - - /** - * These keep track of the last requestId and responseId to be received. - */ - private long lastReceivedRequestId; - - /** - * Last requestID sent in a response (for batching) - */ - private long lastSentRequestId; - - private class ResponseStatus implements Comparable<ResponseStatus> - { - private long requestId; - private AMQMethodBody responseMethodBody; - - public ResponseStatus(long requestId) - { - this.requestId = requestId; - responseMethodBody = null; - } - - public int compareTo(ResponseStatus o) - { - return (int)(requestId - o.requestId); - } - - public String toString() - { - // Need to define this - return ""; - } - } - - private ConcurrentHashMap<Long, ResponseStatus> responseMap; - - public ResponseManager(long connectionId, int channel, boolean serverFlag) - { - this.channel = channel; - this.serverFlag = serverFlag; - this.connectionId = connectionId; - responseIdCount = 1L; - lastReceivedRequestId = 0L; - maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES); - responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); - } - - // *** Functions to handle an incoming request *** - - public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception - { - long requestId = requestBody.getRequestId(); - if (logger.isDebugEnabled()) - { - logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + - requestBody + "; " + requestBody.getMethodPayload()); - } - long responseMark = requestBody.getResponseMark(); - lastReceivedRequestId = requestId; - responseMap.put(requestId, new ResponseStatus(requestId)); - - AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, - requestBody.getMethodPayload(), requestId); - - return methodEvent; - } - - public List<AMQFrame> sendResponse(AMQPMethodEvent evt) - throws RequestResponseMappingException - { - long requestId = evt.getCorrelationId(); - AMQMethodBody responseMethodBody = evt.getMethod(); - - if (logger.isDebugEnabled()) - { - logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + - "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); - } - - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus == null) - { - throw new RequestResponseMappingException(requestId, - "Failed to locate requestId " + requestId + " in responseMap." + responseMap); - } - if (responseStatus.responseMethodBody != null) - { - throw new RequestResponseMappingException(requestId, "RequestId " + - requestId + " already has a response in responseMap."); - } - responseStatus.responseMethodBody = responseMethodBody; - return doBatches(); - } - - // *** Management functions *** - - /** - * Sends batched responses - i.e. all those members of responseMap that have - * received a response. - */ - public synchronized List<AMQFrame> doBatches() - { - long startRequestId = 0; - int numAdditionalRequestIds = 0; - Class responseMethodBodyClass = null; - List<AMQFrame> frames = new ArrayList<AMQFrame>(); - Iterator<Long> lItr = responseMap.keySet().iterator(); - while (lItr.hasNext()) - { - long requestId = lItr.next(); - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus.responseMethodBody != null) - { - frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody)); - lItr.remove(); - } - } - - return frames; - } - - /** - * Total number of entries in the responseMap - including both those that - * are outstanding (i.e. no response has been received) and those that are - * batched (those for which responses have been received but have not yet - * been collected together and sent). - */ - public int responsesMapSize() - { - return responseMap.size(); - } - - /** - * As the responseMap may contain both outstanding responses (those with - * ResponseStatus.responseMethodBody still null) and responses waiting to - * be batched (those with ResponseStatus.responseMethodBody not null), we - * need to count only those in the map with responseMethodBody null. - */ - public int outstandingResponses() - { - int cnt = 0; - for (Long requestId : responseMap.keySet()) - { - if (responseMap.get(requestId).responseMethodBody == null) - cnt++; - } - return cnt; - } - - /** - * As the responseMap may contain both outstanding responses (those with - * ResponseStatus.responseMethodBody still null) and responses waiting to - * be batched (those with ResponseStatus.responseMethodBody not null), we - * need to count only those in the map with responseMethodBody not null. - */ - public int batchedResponses() - { - int cnt = 0; - for (Long requestId : responseMap.keySet()) - { - if (responseMap.get(requestId).responseMethodBody != null) - cnt++; - } - return cnt; - } - - // *** Private helper functions *** - - private long getNextResponseId() - { - return responseIdCount++; - } - - private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests, - AMQMethodBody responseMethodBody) - { - long responseId = getNextResponseId(); // Get new response ID - AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, - firstRequestId, numAdditionalRequests, responseMethodBody); - return responseFrame; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java deleted file mode 100644 index d79525c5b2..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.nclient.message; - -import java.util.LinkedList; -import java.util.List; - -public class AMQPApplicationMessage { - - private int bytesReceived = 0; - private int channelId; - private byte[] referenceId; - private List<byte[]> contents = new LinkedList<byte[]>(); - private long deliveryTag; - private boolean redeliveredFlag; - private MessageHeaders messageHeaders; - - public AMQPApplicationMessage(int channelId, byte[] referenceId) - { - this.channelId = channelId; - this.referenceId = referenceId; - } - - public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) - { - this.channelId = channelId; - this.deliveryTag = deliveryTag; - this.messageHeaders = messageHeaders; - this.redeliveredFlag = redeliveredFlag; - } - - public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) - { - this.channelId = channelId; - this.deliveryTag = deliveryTag; - this.messageHeaders = messageHeaders; - this.redeliveredFlag = redeliveredFlag; - addContent(content); - } - - public void addContent(byte[] content) - { - contents.add(content); - bytesReceived += content.length; - } - - public int getBytesReceived() - { - return bytesReceived; - } - - public int getChannelId() - { - return channelId; - } - - public byte[] getReferenceId() - { - return referenceId; - } - - public List<byte[]> getContents() - { - return contents; - } - - public long getDeliveryTag() - { - return deliveryTag; - } - - public boolean getRedeliveredFlag() - { - return redeliveredFlag; - } - - public MessageHeaders getMessageHeaders() - { - return messageHeaders; - } - - public String toString() - { - return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + - deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + - new String(contents.get(0)); - } - - public void setDeliveryTag(long deliveryTag) - { - this.deliveryTag = deliveryTag; - } - - public void setMessageHeaders(MessageHeaders messageHeaders) - { - this.messageHeaders = messageHeaders; - } - - public void setRedeliveredFlag(boolean redeliveredFlag) - { - this.redeliveredFlag = redeliveredFlag; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java deleted file mode 100644 index 562aa7b06e..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java +++ /dev/null @@ -1,679 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -package org.apache.qpid.nclient.message; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; - -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import java.util.Enumeration; - -public class MessageHeaders -{ - private static final Logger _logger = Logger.getLogger(MessageHeaders.class); - - private AMQShortString _contentType; - - private AMQShortString _encoding; - - private AMQShortString _destination; - - private AMQShortString _exchange; - - private FieldTable _jmsHeaders; - - private short _deliveryMode; - - private short _priority; - - private AMQShortString _correlationId; - - private AMQShortString _replyTo; - - private long _expiration; - - private AMQShortString _messageId; - - private long _timestamp; - - private AMQShortString _type; - - private AMQShortString _userId; - - private AMQShortString _appId; - - private AMQShortString _transactionId; - - private AMQShortString _routingKey; - - private int _size; - - public int getSize() - { - return _size; - } - - public void setSize(int size) - { - this._size = size; - } - - public MessageHeaders() - { - } - - public AMQShortString getContentType() - { - return _contentType; - } - - public void setContentType(AMQShortString contentType) - { - _contentType = contentType; - } - - public AMQShortString getEncoding() - { - return _encoding; - } - - public void setEncoding(AMQShortString encoding) - { - _encoding = encoding; - } - - public FieldTable getJMSHeaders() - { - if (_jmsHeaders == null) - { - setJMSHeaders(FieldTableFactory.newFieldTable()); - } - - return _jmsHeaders; - } - - public void setJMSHeaders(FieldTable headers) - { - _jmsHeaders = headers; - } - - - public short getDeliveryMode() - { - return _deliveryMode; - } - - public void setDeliveryMode(short deliveryMode) - { - _deliveryMode = deliveryMode; - } - - public short getPriority() - { - return _priority; - } - - public void setPriority(short priority) - { - _priority = priority; - } - - public AMQShortString getCorrelationId() - { - return _correlationId; - } - - public void setCorrelationId(AMQShortString correlationId) - { - _correlationId = correlationId; - } - - public AMQShortString getReplyTo() - { - return _replyTo; - } - - public void setReplyTo(AMQShortString replyTo) - { - _replyTo = replyTo; - } - - public long getExpiration() - { - return _expiration; - } - - public void setExpiration(long expiration) - { - _expiration = expiration; - } - - - public AMQShortString getMessageId() - { - return _messageId; - } - - public void setMessageId(AMQShortString messageId) - { - _messageId = messageId; - } - - public long getTimestamp() - { - return _timestamp; - } - - public void setTimestamp(long timestamp) - { - _timestamp = timestamp; - } - - public AMQShortString getType() - { - return _type; - } - - public void setType(AMQShortString type) - { - _type = type; - } - - public AMQShortString getUserId() - { - return _userId; - } - - public void setUserId(AMQShortString userId) - { - _userId = userId; - } - - public AMQShortString getAppId() - { - return _appId; - } - - public void setAppId(AMQShortString appId) - { - _appId = appId; - } - - // MapMessage Interface - - public boolean getBoolean(AMQShortString string) throws JMSException - { - Boolean b = getJMSHeaders().getBoolean(string); - - if (b == null) - { - if (getJMSHeaders().containsKey(string)) - { - Object str = getJMSHeaders().getObject(string); - - if (str == null || !(str instanceof AMQShortString)) - { - throw new MessageFormatException("getBoolean can't use " + string + " item."); - } - else - { - return Boolean.valueOf(((AMQShortString)str).asString()); - } - } - else - { - b = Boolean.valueOf(null); - } - } - - return b; - } - - public char getCharacter(AMQShortString string) throws JMSException - { - Character c = getJMSHeaders().getCharacter(string); - - if (c == null) - { - if (getJMSHeaders().isNullStringValue(string.asString())) - { - throw new NullPointerException("Cannot convert null char"); - } - else - { - throw new MessageFormatException("getChar can't use " + string + " item."); - } - } - else - { - return (char) c; - } - } - - public byte[] getBytes(AMQShortString string) throws JMSException - { - byte[] bs = getJMSHeaders().getBytes(string); - - if (bs == null) - { - throw new MessageFormatException("getBytes can't use " + string + " item."); - } - else - { - return bs; - } - } - - public byte getByte(AMQShortString string) throws JMSException - { - Byte b = getJMSHeaders().getByte(string); - if (b == null) - { - if (getJMSHeaders().containsKey(string)) - { - Object str = getJMSHeaders().getObject(string); - - if (str == null || !(str instanceof AMQShortString)) - { - throw new MessageFormatException("getByte can't use " + string + " item."); - } - else - { - return Byte.valueOf(((AMQShortString)str).asString()); - } - } - else - { - b = Byte.valueOf(null); - } - } - - return b; - } - - public short getShort(AMQShortString string) throws JMSException - { - Short s = getJMSHeaders().getShort(string); - - if (s == null) - { - s = Short.valueOf(getByte(string)); - } - - return s; - } - - public int getInteger(AMQShortString string) throws JMSException - { - Integer i = getJMSHeaders().getInteger(string); - - if (i == null) - { - i = Integer.valueOf(getShort(string)); - } - - return i; - } - - public long getLong(AMQShortString string) throws JMSException - { - Long l = getJMSHeaders().getLong(string); - - if (l == null) - { - l = Long.valueOf(getInteger(string)); - } - - return l; - } - - public float getFloat(AMQShortString string) throws JMSException - { - Float f = getJMSHeaders().getFloat(string); - - if (f == null) - { - if (getJMSHeaders().containsKey(string)) - { - Object str = getJMSHeaders().getObject(string); - - if (str == null || !(str instanceof AMQShortString)) - { - throw new MessageFormatException("getFloat can't use " + string + " item."); - } - else - { - return Float.valueOf(((AMQShortString)str).asString()); - } - } - else - { - f = Float.valueOf(null); - } - - } - - return f; - } - - public double getDouble(AMQShortString string) throws JMSException - { - Double d = getJMSHeaders().getDouble(string); - - if (d == null) - { - d = Double.valueOf(getFloat(string)); - } - - return d; - } - - public AMQShortString getString(AMQShortString string) throws JMSException - { - AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString())); - - if (s == null) - { - if (getJMSHeaders().containsKey(string)) - { - Object o = getJMSHeaders().getObject(string); - if (o instanceof byte[]) - { - throw new MessageFormatException("getObject couldn't find " + string + " item."); - } - else - { - if (o == null) - { - return null; - } - else - { - s = (AMQShortString) o; - } - } - } - } - - return s; - } - - public Object getObject(AMQShortString string) throws JMSException - { - return getJMSHeaders().getObject(string); - } - - public void setBoolean(AMQShortString string, boolean b) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setBoolean(string, b); - } - - public void setChar(AMQShortString string, char c) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setChar(string, c); - } - - public Object setBytes(AMQShortString string, byte[] bytes) - { - return getJMSHeaders().setBytes(string, bytes); - } - - public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) - { - return getJMSHeaders().setBytes(string, bytes, start, length); - } - - public void setByte(AMQShortString string, byte b) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setByte(string, b); - } - - public void setShort(AMQShortString string, short i) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setShort(string, i); - } - - public void setInteger(AMQShortString string, int i) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setInteger(string, i); - } - - public void setLong(AMQShortString string, long l) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setLong(string, l); - } - - public void setFloat(AMQShortString string, float v) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setFloat(string, v); - } - - public void setDouble(AMQShortString string, double v) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setDouble(string, v); - } - - public void setString(AMQShortString string, AMQShortString string1) throws JMSException - { - checkPropertyName(string); - getJMSHeaders().setString(string.asString(), string1.asString()); - } - - public void setObject(AMQShortString string, Object object) throws JMSException - { - checkPropertyName(string); - try - { - getJMSHeaders().setObject(string, object); - } - catch (AMQPInvalidClassException aice) - { - throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); - } - } - - public boolean itemExists(AMQShortString string) throws JMSException - { - return getJMSHeaders().containsKey(string); - } - - public Enumeration getPropertyNames() - { - return getJMSHeaders().getPropertyNames(); - } - - public void clear() - { - getJMSHeaders().clear(); - } - - public boolean propertyExists(AMQShortString propertyName) - { - return getJMSHeaders().propertyExists(propertyName); - } - - public Object put(Object key, Object value) - { - return getJMSHeaders().setObject(key.toString(), value); - } - - public Object remove(AMQShortString propertyName) - { - return getJMSHeaders().remove(propertyName); - } - - public boolean isEmpty() - { - return getJMSHeaders().isEmpty(); - } - - public void writeToBuffer(ByteBuffer data) - { - getJMSHeaders().writeToBuffer(data); - } - - public Enumeration getMapNames() - { - return getPropertyNames(); - } - - protected static void checkPropertyName(CharSequence propertyName) - { - if (propertyName == null) - { - throw new IllegalArgumentException("Property name must not be null"); - } - else if (propertyName.length() == 0) - { - throw new IllegalArgumentException("Property name must not be the empty string"); - } - - checkIdentiferFormat(propertyName); - } - - protected static void checkIdentiferFormat(CharSequence propertyName) - { -// JMS requirements 3.5.1 Property Names -// Identifiers: -// - An identifier is an unlimited-length character sequence that must begin -// with a Java identifier start character; all following characters must be Java -// identifier part characters. An identifier start character is any character for -// which the method Character.isJavaIdentifierStart returns true. This includes -// '_' and '$'. An identifier part character is any character for which the -// method Character.isJavaIdentifierPart returns true. -// - Identifiers cannot be the names NULL, TRUE, or FALSE. -// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or -// ESCAPE. -// – Identifiers are either header field references or property references. The -// type of a property value in a message selector corresponds to the type -// used to set the property. If a property that does not exist in a message is -// referenced, its value is NULL. The semantics of evaluating NULL values -// in a selector are described in Section 3.8.1.2, “Null Values.” -// – The conversions that apply to the get methods for properties do not -// apply when a property is used in a message selector expression. For -// example, suppose you set a property as a string value, as in the -// following: -// myMessage.setStringProperty("NumberOfOrders", "2"); -// The following expression in a message selector would evaluate to false, -// because a string cannot be used in an arithmetic expression: -// "NumberOfOrders > 1" -// – Identifiers are case sensitive. -// – Message header field references are restricted to JMSDeliveryMode, -// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and -// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be -// null and if so are treated as a NULL value. - - if (Boolean.getBoolean("strict-jms")) - { - // JMS start character - if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); - } - - // JMS part character - int length = propertyName.length(); - for (int c = 1; c < length; c++) - { - if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); - } - } - - - - - // JMS invalid names - if ((propertyName.equals("NULL") - || propertyName.equals("TRUE") - || propertyName.equals("FALSE") - || propertyName.equals("NOT") - || propertyName.equals("AND") - || propertyName.equals("OR") - || propertyName.equals("BETWEEN") - || propertyName.equals("LIKE") - || propertyName.equals("IN") - || propertyName.equals("IS") - || propertyName.equals("ESCAPE"))) - { - throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); - } - } - - } - - public AMQShortString getTransactionId() - { - return _transactionId; - } - - public void setTransactionId(AMQShortString id) - { - _transactionId = id; - } - - public AMQShortString getDestination() - { - return _destination; - } - - public void setDestination(AMQShortString destination) - { - this._destination = destination; - } - - public AMQShortString getExchange() - { - return _exchange; - } - - public void setExchange(AMQShortString exchange) - { - this._exchange = exchange; - } - - public AMQShortString getRoutingKey() - { - return _routingKey; - } - - public void setRoutingKey(AMQShortString routingKey) - { - this._routingKey = routingKey; - } -} - - diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java deleted file mode 100644 index 53a2142718..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.message; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.log4j.Logger; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.model.ModelPhase; - -public class MessagePhase extends AbstractPhase { - - private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>(); - private static final Logger _logger = Logger.getLogger(ModelPhase.class); - - public void messageReceived(Object msg) throws AMQPException - { - try - { - _queue.put((AMQPApplicationMessage)msg); - } - catch (InterruptedException e) - { - _logger.error("Error adding message to queue", e); - } - super.messageReceived(msg); - } - - public void messageSent(Object msg) throws AMQPException - { - super.messageSent(msg); - } - - public AMQPApplicationMessage getNextMessage() - { - return _queue.poll(); - } - - public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException - { - return _queue.poll(timeout, tu); - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java deleted file mode 100644 index efd7264f96..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.qpid.nclient.message; - -import org.apache.qpid.AMQException; - -public interface MessageStore { - - public void removeMessage(String identifier); - - public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException; - - public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException; - - public AMQPApplicationMessage getMessage(String identifier) throws AMQException; - - public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException; -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java deleted file mode 100644 index eb5a9c1778..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.apache.qpid.nclient.message; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.qpid.AMQException; - -public class TransientMessageStore implements MessageStore { - - private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>(); - - public AMQPApplicationMessage getMessage(String identifier) - throws AMQException - { - return messageMap.get(identifier); - } - - public void removeMessage(String identifier) - { - messageMap.remove(identifier); - } - - public void storeContentBodyChunk(String identifier, byte[] contentBody) - throws AMQException - { - - } - - public void storeMessageMetaData(String identifier, - MessageHeaders messageHeaders) throws AMQException - { - - } - - public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException - { - - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java deleted file mode 100644 index d845059ee7..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.qpid.nclient.model; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.qpid.nclient.amqp.event.AMQPEventManager; -import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.QpidConstants; - -/** - * This Phase handles Layer 3 functionality of the AMQP spec. - * This class acts as the interface between the API and the pipeline - */ -public class ModelPhase extends AbstractPhase { - - private static final Logger _logger = Logger.getLogger(ModelPhase.class); - - private Map <Class,List> _methodListners = new HashMap<Class,List>(); - - /** - * ------------------------------------------------ - * Phase - methods introduced by Phase - * ------------------------------------------------ - */ - public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) - { - super.init(ctx, nextInFlowPhase, nextOutFlowPhase); - } - - public void messageReceived(Object msg) throws AMQPException - { - notifyMethodListerners((AMQPMethodEvent)msg); - - // not doing super.methodReceived here, as this is the end of - // the pipeline - //super.messageReceived(msg); - } - - /** - * This method should only except and pass messages - * of Type @see AMQPMethodEvent - */ - public void messageSent(Object msg) throws AMQPException - { - super.messageSent(msg); - } - - /** - * ------------------------------------------------ - * Event Handling - * ------------------------------------------------ - */ - - public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException - { - AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER); - eventManager.notifyEvent(event); - } - - /** - * ------------------------------------------------ - * Configuration - * ------------------------------------------------ - */ -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java deleted file mode 100644 index fc5878f6ef..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security; - -import javax.security.auth.callback.CallbackHandler; - -import org.apache.qpid.nclient.transport.ConnectionURL; - -public interface AMQPCallbackHandler extends CallbackHandler -{ - void initialise(ConnectionURL url); -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java deleted file mode 100644 index 428cd6753d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.log4j.Logger; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; - -public class CallbackHandlerRegistry -{ - private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class); - - private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); - - private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>(); - - private String _mechanisms; - - public static CallbackHandlerRegistry getInstance() - { - return _instance; - } - - public Class getCallbackHandlerClass(String mechanism) - { - return _mechanismToHandlerClassMap.get(mechanism); - } - - public String getMechanisms() - { - return _mechanisms; - } - - private CallbackHandlerRegistry() - { - // first we register any Sasl client factories - DynamicSaslRegistrar.registerSaslProviders(); - parseProperties(); - } - - private void parseProperties() - { - String key = QpidConstants.AMQP_SECURITY + "." + - QpidConstants.AMQP_SECURITY_MECHANISMS + "." + - QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER; - - int index = ClientConfiguration.get().getMaxIndex(key); - - for (int i=0; i<index+1;i++) - { - String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); - String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); - Class clazz = null; - try - { - clazz = Class.forName(className); - if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) - { - _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + - ". Skipping"); - continue; - } - _mechanismToHandlerClassMap.put(mechanism, clazz); - if (_mechanisms == null) - { - _mechanisms = mechanism; - } - else - { - // one time cost - _mechanisms = _mechanisms + " " + mechanism; - } - } - catch (ClassNotFoundException ex) - { - _logger.warn("Unable to load class " + className + ". Skipping that SASL provider"); - continue; - } - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java deleted file mode 100644 index 958c6c4782..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security; - -import java.security.Security; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import javax.security.sasl.SaslClientFactory; - -import org.apache.log4j.Logger; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.QpidConstants; - -public class DynamicSaslRegistrar -{ - private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class); - - public static void registerSaslProviders() - { - Map<String, Class<? extends SaslClientFactory>> factories = parseProperties(); - if (factories.size() > 0) - { - Security.addProvider(new JCAProvider(factories)); - _logger.debug("Dynamic SASL provider added as a security provider"); - } - } - - private static Map<String, Class<? extends SaslClientFactory>> parseProperties() - { - List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); - TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = - new TreeMap<String, Class<? extends SaslClientFactory>>(); - for (String mechanism: mechanisms) - { - String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); - try - { - Class<?> clazz = Class.forName(className); - if (!(SaslClientFactory.class.isAssignableFrom(clazz))) - { - _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); - continue; - } - factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); - } - catch (Exception ex) - { - _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); - } - } - return factoriesToRegister; - } - - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java deleted file mode 100644 index 10ccb88821..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security; - -import javax.security.sasl.SaslClientFactory; -import java.security.Provider; -import java.security.Security; -import java.util.Map; - -public class JCAProvider extends Provider -{ - public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap) - { - super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + - "AMQ SASL providers that want to be registered"); - register(providerMap); - Security.addProvider(this); - } - - private void register(Map<String, Class<? extends SaslClientFactory>> providerMap) - { - for (Map.Entry<String, Class<? extends SaslClientFactory>> me : - providerMap.entrySet()) - { - put("SaslClientFactory." + me.getKey(), me.getValue().getName()); - } - } -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java deleted file mode 100644 index 7297d07134..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security; - -import java.io.IOException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; - -import org.apache.qpid.nclient.transport.ConnectionURL; - -public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler -{ - private ConnectionURL _url; - - public void initialise(ConnectionURL url) - { - _url = url; - } - - public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException - { - for (int i = 0; i < callbacks.length; i++) - { - Callback cb = callbacks[i]; - if (cb instanceof NameCallback) - { - ((NameCallback)cb).setName(_url.getUsername()); - } - else if (cb instanceof PasswordCallback) - { - ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray()); - } - else - { - throw new UnsupportedCallbackException(cb); - } - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java deleted file mode 100644 index 1097346c1d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security.amqplain; - -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; - -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.Callback; - -/** - * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd. - * - */ -public class AmqPlainSaslClient implements SaslClient -{ - /** - * The name of this mechanism - */ - public static final String MECHANISM = "AMQPLAIN"; - - private CallbackHandler _cbh; - - public AmqPlainSaslClient(CallbackHandler cbh) - { - _cbh = cbh; - } - - public String getMechanismName() - { - return "AMQPLAIN"; - } - - public boolean hasInitialResponse() - { - return true; - } - - public byte[] evaluateChallenge(byte[] challenge) throws SaslException - { - // we do not care about the prompt or the default name - NameCallback nameCallback = new NameCallback("prompt", "defaultName"); - PasswordCallback pwdCallback = new PasswordCallback("prompt", false); - Callback[] callbacks = new Callback[]{nameCallback, pwdCallback}; - try - { - _cbh.handle(callbacks); - } - catch (Exception e) - { - throw new SaslException("Error handling SASL callbacks: " + e, e); - } - FieldTable table = FieldTableFactory.newFieldTable(); - table.setString("LOGIN", nameCallback.getName()); - table.setString("PASSWORD", new String(pwdCallback.getPassword())); - return table.getDataAsBytes(); - } - - public boolean isComplete() - { - return true; - } - - public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException - { - throw new SaslException("Not supported"); - } - - public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException - { - throw new SaslException("Not supported"); - } - - public Object getNegotiatedProperty(String propName) - { - return null; - } - - public void dispose() throws SaslException - { - _cbh = null; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java deleted file mode 100644 index f98c1e3a58..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.security.amqplain; - -import javax.security.sasl.SaslClientFactory; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import javax.security.sasl.Sasl; -import javax.security.auth.callback.CallbackHandler; -import java.util.Map; - -public class AmqPlainSaslClientFactory implements SaslClientFactory -{ - public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException - { - for (int i = 0; i < mechanisms.length; i++) - { - if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM)) - { - if (cbh == null) - { - throw new SaslException("CallbackHandler must not be null"); - } - return new AmqPlainSaslClient(cbh); - } - } - return null; - } - - public String[] getMechanismNames(Map props) - { - if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || - props.containsKey(Sasl.POLICY_NODICTIONARY) || - props.containsKey(Sasl.POLICY_NOACTIVE)) - { - // returned array must be non null according to interface documentation - return new String[0]; - } - else - { - return new String[]{AmqPlainSaslClient.MECHANISM}; - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java deleted file mode 100644 index 9e878fb839..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - -import java.util.HashMap; -import java.net.URISyntaxException; -import java.net.URI; - -public class AMQPBrokerDetails implements BrokerDetails -{ - private String _host; - - private int _port; - - private String _transport; - - private HashMap<String, String> _options; - - public AMQPBrokerDetails() - { - _options = new HashMap<String, String>(); - } - - public AMQPBrokerDetails(String url) throws URLSyntaxException - { - this(); - // URL should be of format tcp://host:port?option='value',option='value' - try - { - URI connection = new URI(url); - - String transport = connection.getScheme(); - - // Handles some defaults to minimise changes to existing broker URLS e.g. localhost - if (transport != null) - { - //todo this list of valid transports should be enumerated somewhere - if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp")))) - { - if (transport.equalsIgnoreCase("localhost")) - { - connection = new URI(DEFAULT_TRANSPORT + "://" + url); - transport = connection.getScheme(); - } - else - { - if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/') - { - //Then most likely we have a host:port value - connection = new URI(DEFAULT_TRANSPORT + "://" + url); - transport = connection.getScheme(); - } - else - { - URLHelper.parseError(0, transport.length(), "Unknown transport", url); - } - } - } - } - else - { - //Default the transport - connection = new URI(DEFAULT_TRANSPORT + "://" + url); - transport = connection.getScheme(); - } - - if (transport == null) - { - URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url - + "' Format: " + URL_FORMAT_EXAMPLE, ""); - } - - setTransport(transport); - - String host = connection.getHost(); - - // Fix for Java 1.5 - if (host == null) - { - host = ""; - } - - setHost(host); - - int port = connection.getPort(); - - if (port == -1) - { - // Fix for when there is port data but it is not automatically parseable by getPort(). - String auth = connection.getAuthority(); - - if (auth != null && auth.contains(":")) - { - int start = auth.indexOf(":") + 1; - int end = start; - boolean looking = true; - boolean found = false; - //Walk the authority looking for a port value. - while (looking) - { - try - { - end++; - Integer.parseInt(auth.substring(start, end)); - - if (end >= auth.length()) - { - looking = false; - found = true; - } - } - catch (NumberFormatException nfe) - { - looking = false; - } - - } - if (found) - { - setPort(Integer.parseInt(auth.substring(start, end))); - } - else - { - URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, - "Illegal character in port number", connection.toString()); - } - - } - else - { - setPort(DEFAULT_PORT); - } - } - else - { - setPort(port); - } - - String queryString = connection.getQuery(); - - URLHelper.parseOptions(_options, queryString); - - //Fragment is #string (not used) - } - catch (URISyntaxException uris) - { - if (uris instanceof URLSyntaxException) - { - throw (URLSyntaxException) uris; - } - - URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); - } - } - - public AMQPBrokerDetails(String host, int port, boolean useSSL) - { - _host = host; - _port = port; - - if (useSSL) - { - setOption(OPTIONS_SSL, "true"); - } - } - - public String getHost() - { - return _host; - } - - public void setHost(String _host) - { - this._host = _host; - } - - public int getPort() - { - return _port; - } - - public void setPort(int _port) - { - this._port = _port; - } - - public String getTransport() - { - return _transport; - } - - public void setTransport(String _transport) - { - this._transport = _transport; - } - - public String getOption(String key) - { - return _options.get(key); - } - - public void setOption(String key, String value) - { - _options.put(key, value); - } - - public long getTimeout() - { - if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT)) - { - try - { - return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT)); - } - catch (NumberFormatException nfe) - { - //Do nothing as we will use the default below. - } - } - - return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; - } - - public void setTimeout(long timeout) - { - setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout)); - } - - public String toString() - { - StringBuffer sb = new StringBuffer(); - - sb.append(_transport); - sb.append("://"); - - if (!(_transport.equalsIgnoreCase("vm"))) - { - sb.append(_host); - } - - sb.append(':'); - sb.append(_port); - - sb.append(printOptionsURL()); - - return sb.toString(); - } - - public boolean equals(Object o) - { - if (!(o instanceof BrokerDetails)) - { - return false; - } - - BrokerDetails bd = (BrokerDetails) o; - - return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort()) - && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL()); - - //todo do we need to compare all the options as well? - } - - private String printOptionsURL() - { - StringBuffer optionsURL = new StringBuffer(); - - optionsURL.append('?'); - - if (!(_options.isEmpty())) - { - - for (String key : _options.keySet()) - { - optionsURL.append(key); - - optionsURL.append("='"); - - optionsURL.append(_options.get(key)); - - optionsURL.append("'"); - - optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR); - } - } - - //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options - optionsURL.deleteCharAt(optionsURL.length() - 1); - - return optionsURL.toString(); - } - - public boolean useSSL() - { - // To be friendly to users we should be case insensitive. - // or simply force users to conform to OPTIONS_SSL - // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL - - if (_options.containsKey(OPTIONS_SSL)) - { - return _options.get(OPTIONS_SSL).equalsIgnoreCase("true"); - } - - return USE_SSL_DEFAULT; - } - - public void useSSL(boolean ssl) - { - setOption(OPTIONS_SSL, Boolean.toString(ssl)); - } - - public static String checkTransport(String broker) - { - if ((!broker.contains("://"))) - { - return "tcp://" + broker; - } - else - { - return broker; - } - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java deleted file mode 100644 index d0259830c6..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - -import java.util.*; -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; - -public class AMQPConnectionURL implements ConnectionURL -{ - private String _url; - private String _failoverMethod; - private HashMap<String, String> _failoverOptions; - private HashMap<String, String> _options; - private List<BrokerDetails> _brokers; - private String _clientName; - private String _username; - private String _password; - private String _virtualHost; - - public AMQPConnectionURL(String fullURL) throws URLSyntaxException - { - _url = fullURL; - _options = new HashMap<String, String>(); - _brokers = new LinkedList<BrokerDetails>(); - _failoverOptions = new HashMap<String, String>(); - - try - { - URI connection = new URI(fullURL); - - if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) - { - throw new URISyntaxException(fullURL, "Not an AMQP URL"); - } - - if (connection.getHost() == null || connection.getHost().equals("")) - { - String uid = getUniqueClientID(); - if (uid == null) - { - URLHelper.parseError(-1, "Client Name not specified", fullURL); - } - else - { - setClientName(uid); - } - - } - else - { - setClientName(connection.getHost()); - } - - String userInfo = connection.getUserInfo(); - - if (userInfo == null) - { - //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs - userInfo = connection.getAuthority(); - - if (userInfo != null) - { - int atIndex = userInfo.indexOf('@'); - - if (atIndex != -1) - { - userInfo = userInfo.substring(0, atIndex); - } - else - { - userInfo = null; - } - } - - } - - if (userInfo == null) - { - URLHelper.parseError(AMQ_PROTOCOL.length() + 3, - "User information not found on url", fullURL); - } - else - { - parseUserInfo(userInfo); - } - String virtualHost = connection.getPath(); - - if (virtualHost != null && (!virtualHost.equals(""))) - { - setVirtualHost(virtualHost); - } - else - { - int authLength = connection.getAuthority().length(); - int start = AMQ_PROTOCOL.length() + 3; - int testIndex = start + authLength; - if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') - { - URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); - } - else - { - URLHelper.parseError(-1, "Virtual host not specified", fullURL); - } - - } - - - URLHelper.parseOptions(_options, connection.getQuery()); - - processOptions(); - - //Fragment is #string (not used) - //System.out.println(connection.getFragment()); - - } - catch (URISyntaxException uris) - { - if (uris instanceof URLSyntaxException) - { - throw (URLSyntaxException) uris; - } - - int slash = fullURL.indexOf("\\"); - - if (slash == -1) - { - URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); - } - else - { - if (slash != 0 && fullURL.charAt(slash - 1) == ':') - { - URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); - } - else - { - URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); - } - } - - } - } - - private String getUniqueClientID() - { - try - { - InetAddress addr = InetAddress.getLocalHost(); - return addr.getHostName() + System.currentTimeMillis(); - } - catch (UnknownHostException e) - { - return null; - } - } - - private void parseUserInfo(String userinfo) throws URLSyntaxException - { - //user info = user:pass - - int colonIndex = userinfo.indexOf(':'); - - if (colonIndex == -1) - { - URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), - "Null password in user information not allowed.", _url); - } - else - { - setUsername(userinfo.substring(0, colonIndex)); - setPassword(userinfo.substring(colonIndex + 1)); - } - - } - - private void processOptions() throws URLSyntaxException - { - if (_options.containsKey(OPTIONS_BROKERLIST)) - { - String brokerlist = _options.get(OPTIONS_BROKERLIST); - - //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' - StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); - - while (st.hasMoreTokens()) - { - String broker = st.nextToken(); - - _brokers.add(new AMQPBrokerDetails(broker)); - } - - _options.remove(OPTIONS_BROKERLIST); - } - - if (_options.containsKey(OPTIONS_FAILOVER)) - { - String failover = _options.get(OPTIONS_FAILOVER); - - // failover='method?option='value',option='value'' - - int methodIndex = failover.indexOf('?'); - - if (methodIndex > -1) - { - _failoverMethod = failover.substring(0, methodIndex); - URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); - } - else - { - _failoverMethod = failover; - } - - _options.remove(OPTIONS_FAILOVER); - } - } - - public String getURL() - { - return _url; - } - - public String getFailoverMethod() - { - return _failoverMethod; - } - - public String getFailoverOption(String key) - { - return _failoverOptions.get(key); - } - - public void setFailoverOption(String key, String value) - { - _failoverOptions.put(key, value); - } - - public int getBrokerCount() - { - return _brokers.size(); - } - - public BrokerDetails getBrokerDetails(int index) - { - if (index < _brokers.size()) - { - return _brokers.get(index); - } - else - { - return null; - } - } - - public void addBrokerDetails(BrokerDetails broker) - { - if (!(_brokers.contains(broker))) - { - _brokers.add(broker); - } - } - - public List<BrokerDetails> getAllBrokerDetails() - { - return _brokers; - } - - public String getClientName() - { - return _clientName; - } - - public void setClientName(String clientName) - { - _clientName = clientName; - } - - public String getUsername() - { - return _username; - } - - public void setUsername(String username) - { - _username = username; - } - - public String getPassword() - { - return _password; - } - - public void setPassword(String password) - { - _password = password; - } - - public String getVirtualHost() - { - return _virtualHost; - } - - public void setVirtualHost(String virtuaHost) - { - _virtualHost = virtuaHost; - } - - public String getOption(String key) - { - return _options.get(key); - } - - public void setOption(String key, String value) - { - _options.put(key, value); - } - - public String toString() - { - StringBuffer sb = new StringBuffer(); - - sb.append(AMQ_PROTOCOL); - sb.append("://"); - - if (_username != null) - { - sb.append(_username); - - if (_password != null) - { - sb.append(':'); - sb.append(_password); - } - - sb.append('@'); - } - - sb.append(_clientName); - - sb.append(_virtualHost); - - sb.append(optionsToString()); - - return sb.toString(); - } - - private String optionsToString() - { - StringBuffer sb = new StringBuffer(); - - sb.append("?" + OPTIONS_BROKERLIST + "='"); - - for (BrokerDetails service : _brokers) - { - sb.append(service.toString()); - sb.append(';'); - } - - sb.deleteCharAt(sb.length() - 1); - sb.append("'"); - - if (_failoverMethod != null) - { - sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); - sb.append(OPTIONS_FAILOVER + "='"); - sb.append(_failoverMethod); - sb.append(URLHelper.printOptions(_failoverOptions)); - sb.append("'"); - } - - return sb.toString(); - } - - - public static void main(String[] args) throws URLSyntaxException - { - - String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; - //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; - - //ConnectionURL connectionurl2 = new AMQConnectionURL(url2); - - System.out.println(url2); - //System.out.println(connectionurl2); - - } - -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java deleted file mode 100644 index fe20a1e8dd..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - -public interface BrokerDetails -{ - - /* - * Known URL Options - * @see ConnectionURL - */ - public static final String OPTIONS_RETRY = "retries"; - public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL; - public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; - public static final int DEFAULT_PORT = 5672; - - public static final String TCP = "tcp"; - public static final String VM = "vm"; - - public static final String DEFAULT_TRANSPORT = TCP; - - public static final String URL_FORMAT_EXAMPLE = - "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]"; - - public static final long DEFAULT_CONNECT_TIMEOUT = 30000L; - public static final boolean USE_SSL_DEFAULT = false; - - String getHost(); - - void setHost(String host); - - int getPort(); - - void setPort(int port); - - String getTransport(); - - void setTransport(String transport); - - boolean useSSL(); - - void useSSL(boolean ssl); - - String getOption(String key); - - void setOption(String key, String value); - - long getTimeout(); - - void setTimeout(long timeout); - - String toString(); - - boolean equals(Object o); -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java deleted file mode 100644 index 79653dc442..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - -import java.util.List; - -/** - Connection URL format - For TCP: - amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\' - - For VMBroker: - vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" - - Options are of course optional except for requiring a single broker in the broker list. - The option seperator is defined to be either '&' or ',' - */ -public interface ConnectionURL -{ - public static final String AMQ_PROTOCOL = "amqp"; - public static final String OPTIONS_BROKERLIST = "brokerlist"; - public static final String OPTIONS_FAILOVER = "failover"; - public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; - public static final String OPTIONS_SSL = "ssl"; - - String getURL(); - - String getFailoverMethod(); - - String getFailoverOption(String key); - - int getBrokerCount(); - - BrokerDetails getBrokerDetails(int index); - - void addBrokerDetails(BrokerDetails broker); - - List<BrokerDetails> getAllBrokerDetails(); - - String getClientName(); - - void setClientName(String clientName); - - String getUsername(); - - void setUsername(String username); - - String getPassword(); - - void setPassword(String password); - - String getVirtualHost(); - - void setVirtualHost(String virtualHost); - - String getOption(String key); - - void setOption(String key, String value); -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java deleted file mode 100644 index 734aa68a9d..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.apache.qpid.nclient.transport; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.DefaultPhaseContext; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.PhaseFactory; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.pool.ReadWriteThreadModel; - -public class TCPConnection implements TransportConnection -{ - private static final Logger _logger = Logger.getLogger(TCPConnection.class); - private BrokerDetails _brokerDetails; - private IoConnector _ioConnector; - private Phase _phase; - private PhaseContext _ctx; - - protected TCPConnection(ConnectionURL url, PhaseContext ctx) - { - _brokerDetails = url.getBrokerDetails(0); - _ctx = ctx; - - ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR)) - { - // Not sure what the original code wanted use :) - } - else - { - _logger.info("Using SimpleByteBufferAllocator"); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - _ioConnector = new SocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL)) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY)); - scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024); - scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); - } - - // Returns the phase pipe - public Phase connect() throws AMQPException - { - _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); - - _phase = PhaseFactory.createPhasePipe(_ctx); - _phase.start(); - - return _phase; - } - - public void close() throws AMQPException - { - - } - - public Phase getPhasePipe() - { - return _phase; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java deleted file mode 100644 index 9df353a2df..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - - -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.Phase; - -public interface TransportConnection -{ - public Phase connect()throws AMQPException; - - public void close()throws AMQPException; - - public Phase getPhasePipe(); -}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java deleted file mode 100644 index ce2145c08b..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.qpid.nclient.transport; - -import java.net.URISyntaxException; - -import org.apache.qpid.nclient.core.PhaseContext; - -public class TransportConnectionFactory -{ - public enum ConnectionType - { - TCP,VM - } - - public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException - { - return createTransportConnection(new AMQPConnectionURL(url),type,ctx); - - } - - public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx) - { - switch (type) - { - case TCP : default: - { - return new TCPConnection(url,ctx); - } - - case VM : - { - return new VMConnection(url,ctx); - } - } - - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java deleted file mode 100644 index 911e855d4f..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.nclient.transport; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; - -import org.apache.log4j.Logger; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersionList; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.AbstractPhase; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.ssl.BogusSSLContextFactory; - -/** - * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame - * layer - * - */ -public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList -{ - - private static final Logger _logger = Logger - .getLogger(TransportPhase.class); - - private IoSession _ioSession; - private BrokerDetails _brokerDetails; - - protected WriteFuture _lastWriteFuture; - - /** - * ------------------------------------------------ - * Phase - methods introduced by Phase - * ------------------------------------------------ - */ - - public void start()throws AMQPException - { - _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS); - IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR); - - final SocketAddress address; - if (ioConnector instanceof VmPipeConnector) - { - address = new VmPipeAddress(_brokerDetails.getPort()); - } - else - { - address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort()); - _logger.info("Attempting connection to " + address); - - } - - ConnectFuture future = ioConnector.connect(address,this); - - // wait for connection to complete - if (future.join(_brokerDetails.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else - { - throw new AMQPException("Timeout waiting for connection."); - } - } - - public void messageReceived(Object frame) throws AMQPException - { - super.messageReceived(frame); - } - - public void messageSent(Object frame) throws AMQPException - { - _ioSession.write(frame); - } - - /** - * ------------------------------------------------ - * IoHandler - methods introduced by IoHandler - * ------------------------------------------------ - */ - public void sessionIdle(IoSession session, IdleStatus status) - throws Exception - { - _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: " - + status); - if (IdleStatus.WRITER_IDLE.equals(status)) - { - // write heartbeat frame: - _logger.debug("Sent heartbeat"); - session.write(HeartbeatBody.FRAME); - // HeartbeatDiagnostics.sent(); - } else if (IdleStatus.READER_IDLE.equals(status)) - { - // failover: - // HeartbeatDiagnostics.timeout(); - _logger.warn("Timed out while waiting for heartbeat from peer."); - session.close(); - } - } - - public void messageReceived(IoSession session, Object message) - throws Exception - { - AMQFrame frame = (AMQFrame) message; - final AMQBody bodyFrame = frame.getBodyFrame(); - - if (bodyFrame instanceof HeartbeatBody) - { - _logger.debug("Received heartbeat"); - } else - { - messageReceived(frame); - } - } - - public void messageSent(IoSession session, Object message) throws Exception - { - _logger.debug("Sent frame " + message); - } - - public void exceptionCaught(IoSession session, Throwable cause) - throws Exception - { - // Need to handle failover - _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause); - //sessionClosed(session); - } - - public void sessionClosed(IoSession session) throws Exception - { - // Need to handle failover - _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed"); - } - - public void sessionCreated(IoSession session) throws Exception - { - _logger.info("Protocol session created for " + this + " session : " - + System.identityHashCode(session)); - - final ProtocolCodecFilter pcf = new ProtocolCodecFilter( - new AMQCodecFactory(false)); - - if (ClientConfiguration.get().getBoolean( - QpidConstants.USE_SHARED_READ_WRITE_POOL)) - { - session.getFilterChain().addBefore("AsynchronousWriteFilter", - "protocolFilter", pcf); - } - else - { - session.getFilterChain().addLast("protocolFilter", pcf); - } - // we only add the SSL filter where we have an SSL connection - if (_brokerDetails.useSSL()) - { - // FIXME: Bogus context cannot be used in production. - SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory - .getInstance(false)); - sslFilter.setUseClientMode(true); - session.getFilterChain().addBefore("protocolFilter", "ssl", - sslFilter); - } - - try - { - - ReadWriteThreadModel threadModel = ReadWriteThreadModel - .getInstance(); - threadModel.getAsynchronousReadFilter().createNewJobForSession( - session); - threadModel.getAsynchronousWriteFilter().createNewJobForSession( - session); - } catch (RuntimeException e) - { - e.printStackTrace(); - } - - _ioSession = session; - doAMQPConnectionNegotiation(); - } - - public void sessionOpened(IoSession session) throws Exception - { - _logger.info("Protocol session opened for " + this + " : session " - + System.identityHashCode(session)); - } - - /** - * ---------------------------------------------------------- - * Protocol related methods - * ---------------------------------------------------------- - */ - private void doAMQPConnectionNegotiation() - { - int i = pv.length - 1; - _logger.debug("Engaging in connection negotiation"); - writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); - } - - /** - * ---------------------------------------------------------- - * Write Operations - * ---------------------------------------------------------- - */ - public void writeFrame(AMQDataBlock frame) - { - writeFrame(frame, false); - } - - public void writeFrame(AMQDataBlock frame, boolean wait) - { - WriteFuture f = _ioSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } else - { - _lastWriteFuture = f; - } - } - - /** - * ----------------------------------------------------------- - * Failover section - * ----------------------------------------------------------- - */ -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java deleted file mode 100644 index ba38848149..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java +++ /dev/null @@ -1,135 +0,0 @@ -package org.apache.qpid.nclient.transport; - -import java.io.IOException; - -import org.apache.log4j.Logger; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.vmpipe.VmPipeAcceptor; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.nclient.config.ClientConfiguration; -import org.apache.qpid.nclient.core.AMQPException; -import org.apache.qpid.nclient.core.DefaultPhaseContext; -import org.apache.qpid.nclient.core.Phase; -import org.apache.qpid.nclient.core.PhaseContext; -import org.apache.qpid.nclient.core.PhaseFactory; -import org.apache.qpid.nclient.core.QpidConstants; -import org.apache.qpid.pool.PoolingFilter; -import org.apache.qpid.pool.ReadWriteThreadModel; -import org.apache.qpid.pool.ReferenceCountingExecutorService; - -public class VMConnection implements TransportConnection -{ - private static final Logger _logger = Logger.getLogger(VMConnection.class); - private BrokerDetails _brokerDetails; - private IoConnector _ioConnector; - private Phase _phase; - private PhaseContext _ctx; - - protected VMConnection(ConnectionURL url,PhaseContext ctx) - { - _brokerDetails = url.getBrokerDetails(0); - _ctx = ctx; - - _ioConnector = new VmPipeConnector(); - final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); - ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService, - "AsynchronousReadFilter"); - cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); - PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, - "AsynchronousWriteFilter"); - cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); - } - - public Phase connect() throws AMQPException - { - createVMBroker(); - - _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); - - _phase = PhaseFactory.createPhasePipe(_ctx); - _phase.start(); - - return _phase; - - } - - private void createVMBroker()throws AMQPException - { - _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); - - VmPipeAcceptor acceptor = new VmPipeAcceptor(); - IoServiceConfig config = acceptor.getDefaultConfig(); - config.setThreadModel(ReadWriteThreadModel.getInstance()); - - IoHandlerAdapter provider = null; - try - { - VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); - provider = createBrokerInstance(_brokerDetails.getPort()); - acceptor.bind(pipe, provider); - _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); - } - catch (IOException e) - { - _logger.error(e); - VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); - acceptor.unbind(pipe); - - throw new AMQPException("Error creating VM broker",e); - } - } - - private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException - { - String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS); - _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); - - // can't use introspection to get Provider as it is a server class. - // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. - - //get correct constructor and pass in instancec ID - "port" - IoHandlerAdapter provider; - try - { - Class[] cnstr = {Integer.class}; - Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - //Give the broker a second to create - _logger.info("Created VMBroker Instance:" + port); - } - catch (Exception e) - { - _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause()); - _logger.error(e); - String because; - if (e.getCause() == null) - { - because = e.toString(); - } - else - { - because = e.getCause().toString(); - } - - - throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e); - } - - return provider; - } - - public void close() throws AMQPException - { - - } - - public Phase getPhasePipe() - { - return _phase; - } -} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java deleted file mode 100644 index e161e30227..0000000000 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.qpid.nclient.util; - -import org.apache.qpid.nclient.core.AMQPException; - -public class AMQPValidator -{ - public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException - { - if(obj == null) - { - throw new AMQPException(msg); - } - } -} |
