diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-30 19:21:52 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-04-30 19:21:52 +0000 |
| commit | 02e8e01f5e41a6a3f7123c73abdbe229e37af381 (patch) | |
| tree | 4e03d7bb972c635167324ed891fec25a76850e5d /java | |
| parent | 311ac3b34c254109bc74ffa962850fba1d5b85e5 (diff) | |
| download | qpid-python-02e8e01f5e41a6a3f7123c73abdbe229e37af381.tar.gz | |
moving the new low level client to the trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@533831 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
75 files changed, 8178 insertions, 0 deletions
diff --git a/java/newclient/.project b/java/newclient/.project new file mode 100644 index 0000000000..4d42311982 --- /dev/null +++ b/java/newclient/.project @@ -0,0 +1,17 @@ +<projectDescription> + <name>qpid-newclient</name> + <comment/> + <projects> + <project>qpid-broker</project> + <project>qpid-common</project> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments/> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription>
\ No newline at end of file diff --git a/java/newclient/pom.xml b/java/newclient/pom.xml new file mode 100644 index 0000000000..5a82feb1d2 --- /dev/null +++ b/java/newclient/pom.xml @@ -0,0 +1,194 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-newclient</artifactId> + <packaging>jar</packaging> + <version>1.0-incubating-M2-SNAPSHOT</version> + <name>Qpid New Client</name> + <url>http://cwiki.apache.org/confluence/display/qpid</url> + + <parent> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid</artifactId> + <version>1.0-incubating-M2-SNAPSHOT</version> + </parent> + + <properties> + <topDirectoryLocation>..</topDirectoryLocation> + <java.source.version>1.5</java.source.version> + <qpid.version>${pom.version}</qpid.version> + <qpid.targetDir>${project.build.directory}</qpid.targetDir> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + </dependency> + + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>1.3</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.mina</groupId> + <artifactId>mina-filter-ssl</artifactId> + </dependency> + + <!-- Test Dependencies --> + <dependency> <!-- for inVm Broker --> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-broker</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>jmscts</groupId> + <artifactId>jmscts</artifactId> + <version>0.5-b2</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymockclassextension</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>amqj.noAutoCreateVMBroker</name> + <value>true</value> + </property> + <property> + <name>amqj.logging.level</name> + <value>${amqj.logging.level}</value> + </property> + <property> + <name>log4j.configuration</name> + <value>file:///${basedir}/src/main/java/client.log4j</value> + </property> + </systemProperties> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + </plugins> + +<!-- The inclusion of this resource causes the build to hang. --> + <!--resources> + <resource> + <targetPath>META-INF/</targetPath> + <filtering>false</filtering> + <directory>../resources/META-INF</directory> + <includes> + <include>**</include> + </includes> + </resource> + </resources--> + + <testResources> + <testResource> + <targetPath>META-INF/</targetPath> + <filtering>false</filtering> + <directory>../resources/META-INF</directory> + <includes> + <include>**</include> + </includes> + </testResource> + <testResource> + <targetPath>src/</targetPath> + <filtering>false</filtering> + <directory>src/test/java</directory> + <includes> + <include>**/*.java</include> + </includes> + </testResource> + + <testResource> + <targetPath></targetPath> + <filtering>false</filtering> + <directory>src/main/java</directory> + <includes> + <include>client.log4j</include> + </includes> + </testResource> + </testResources> + + </build> + +</project> diff --git a/java/newclient/src/main/java/client.log4j b/java/newclient/src/main/java/client.log4j new file mode 100644 index 0000000000..c2c7326479 --- /dev/null +++ b/java/newclient/src/main/java/client.log4j @@ -0,0 +1,29 @@ +#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=DEBUG
+
+
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java new file mode 100644 index 0000000000..a733fb7db7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBack.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQMethodBody; + +public abstract class AMQPCallBack +{ + private boolean _isComplete = false; + + public abstract void brokerResponded(AMQMethodBody body); + + public abstract void brokerRespondedWithError(AMQException e); + + public void setIsComplete(boolean isComplete) + { + _isComplete = isComplete; + } + + public boolean isComplete() + { + return _isComplete; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java new file mode 100644 index 0000000000..f984e812c2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import java.security.SecureRandom; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.QpidConstants; + +public abstract class AMQPCallBackSupport +{ + private SecureRandom _localCorrelationIdGenerator = new SecureRandom(); + protected ConcurrentHashMap<Long,AMQPCallBack> _cbMap = new ConcurrentHashMap<Long,AMQPCallBack>(); + + //the channelId assigned for this instance + protected int _channelId; + + public AMQPCallBackSupport(int channelId) + { + _channelId = channelId; + } + + private long getNextCorrelationId() + { + return _localCorrelationIdGenerator.nextLong(); + } + + + // For methods that still use nowait, hopefully they will remove nowait + protected AMQPMethodEvent handleNoWait(boolean noWait,AMQMethodBody methodBody,AMQPCallBack cb) + { + if(noWait) + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID); + return msg; + } + else + { + // u only need to register if u are expecting a response + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + } + + protected AMQPMethodEvent handleAsynchronousCall(AMQMethodBody methodBody,AMQPCallBack cb) + { + long localCorrelationId = getNextCorrelationId(); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId); + _cbMap.put(localCorrelationId, cb); + return msg; + } + + protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException + { + if(_cbMap.containsKey(localCorrelationId)) + { + AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId); + if(cb == null) + { + throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody); + } + else + { + cb.setIsComplete(true); + cb.brokerResponded(methodBody); + } + _cbMap.remove(localCorrelationId); + } + else + { + //ignore, as this event is for another class instance + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java new file mode 100644 index 0000000000..cbb60b130d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPChannel +{ + + /** + * Opens the channel + */ + public abstract ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException; + + /** + * Close the channel + */ + public abstract ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException; + + /** + * Channel Flow + */ + public abstract ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException; + + /** + * Close the channel + */ + public abstract ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java new file mode 100644 index 0000000000..4976daa4fa --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPChannel; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPExchange; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPMessage; +import org.apache.qpid.nclient.amqp.qpid.QpidAMQPQueue; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; +import org.apache.qpid.url.URLSyntaxException; + +public interface AMQPClassFactory +{ + + public abstract AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException; + + public abstract AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException; + + public abstract AMQPChannel createChannelClass(int channel) throws AMQPException; + + public abstract void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException; + + public abstract AMQPExchange createExchangeClass(int channel) throws AMQPException; + + public abstract void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException; + + public abstract AMQPQueue createQueueClass(int channel) throws AMQPException; + + public abstract void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException; + + public abstract AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException; + + public abstract void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException; + + /** + * Extention point + * Other interested parties can obtain a reference to the event manager + * and add listeners to get notified of events + * + */ + public abstract AMQPEventManager getEventManager(); + + /** + * Extention point + * Other interested parties can obtain a reference to the state manager + * and add listeners to get notified of state changes + * + */ + public abstract AMQPStateManager getStateManager(); + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java new file mode 100644 index 0000000000..b18fed5605 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPConnection +{ + + /** + * Opens the TCP connection and let the formalities begin. + */ + public abstract ConnectionStartBody openTCPConnection() throws AMQPException; + + /** + * The current java broker implementation can send a connection tune body + * as a response to the startOk. Not sure if that is the correct behaviour. + */ + public abstract AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException; + + /** + * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could + * issue a new challenge + */ + public abstract AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException; + + public abstract void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException; + + public abstract ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException; + + public abstract ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java new file mode 100644 index 0000000000..53e11c48fb --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxCoordination.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.DtxCoordinationCommitBody; +import org.apache.qpid.framing.DtxCoordinationForgetBody; +import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; +import org.apache.qpid.framing.DtxCoordinationPrepareBody; +import org.apache.qpid.framing.DtxCoordinationRecoverBody; +import org.apache.qpid.framing.DtxCoordinationRollbackBody; +import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPDtxCoordination +{ + public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException; + + public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException; + + public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException; + + public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException; + + public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException; + + public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException; + + public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java new file mode 100644 index 0000000000..41f1414205 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPDtxDemarcation.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.DtxDemarcationEndBody; +import org.apache.qpid.framing.DtxDemarcationEndOkBody; +import org.apache.qpid.framing.DtxDemarcationSelectBody; +import org.apache.qpid.framing.DtxDemarcationSelectOkBody; +import org.apache.qpid.framing.DtxDemarcationStartBody; +import org.apache.qpid.framing.DtxDemarcationStartOkBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPDtxDemarcation +{ + public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException; + + public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException; + + public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java new file mode 100644 index 0000000000..c2f93b8f42 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPExchange +{ + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public abstract void declare(ExchangeDeclareBody exchangeDeclareBody, AMQPCallBack cb) throws AMQPException; + + public abstract void delete(ExchangeDeleteBody exchangeDeleteBody, AMQPCallBack cb) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java new file mode 100644 index 0000000000..61c5825fe0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPMessage +{ + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + + public abstract void transfer(MessageTransferBody messageTransferBody, AMQPCallBack cb) throws AMQPException; + + public abstract void consume(MessageConsumeBody messageConsumeBody, AMQPCallBack cb) throws AMQPException; + + public abstract void cancel(MessageCancelBody messageCancelBody, AMQPCallBack cb) throws AMQPException; + + public abstract void get(MessageGetBody messageGetBody, AMQPCallBack cb) throws AMQPException; + + public abstract void recover(MessageRecoverBody messageRecoverBody, AMQPCallBack cb) throws AMQPException; + + public abstract void open(MessageOpenBody messageOpenBody, AMQPCallBack cb) throws AMQPException; + + public abstract void close(MessageCloseBody messageCloseBody, AMQPCallBack cb) throws AMQPException; + + public abstract void append(MessageAppendBody messageAppendBody, AMQPCallBack cb) throws AMQPException; + + public abstract void checkpoint(MessageCheckpointBody messageCheckpointBody, AMQPCallBack cb) throws AMQPException; + + public abstract void resume(MessageResumeBody messageResumeBody, AMQPCallBack cb) throws AMQPException; + + public abstract void qos(MessageQosBody messageQosBody, AMQPCallBack cb) throws AMQPException; + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public abstract void ok(MessageOkBody messageOkBody, long correlationId) throws AMQPException; + + /** + * The correlationId from the request. + * For example if a message.transfer is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public abstract void reject(MessageRejectBody messageRejectBody, long correlationId) throws AMQPException; + + /** + * The correlationId from the request. + * For example if a message.resume is sent with correlationId "ABCD" + * then u need to pass that in. This correlation id is used by the execution layer + * to handle the correlation of method requests and responses + */ + public abstract void offset(MessageOffsetBody messageOffsetBody, long correlationId) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java new file mode 100644 index 0000000000..c10d6975c6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class also represents the AMQP Message class. + * You need an instance per channel. + * This is passed in as an argument in the constructor of an AMQPMessage instance. + * A client who implements this interface is notified When the broker issues + * Message class methods on the client. + * + * A Client should use the AMQPMessage class when it wants to issue Message class + * methods on the broker. + * + * A JMS MessageConsumer implementation can implement this interface and map + * AMQP Method notifications to the appropriate JMS methods. + * + * Simillarly a JMS MessageProducer implementation can wrap an AMQPMessage instance. + * + */ + +public interface AMQPMessageCallBack +{ + /** + * ----------------------------------------------------------------------- + * This provides Notifications for broker initiated Message class methods. + * All methods have a correlationId that u need to pass into + * the corresponding Message methods when responding to the broker. + * + * For example the correlationID passed in from Message.trasnfer + * should be passed back when u call Message.ok in AMQPMessage + * ----------------------------------------------------------------------- + */ + + + public void transfer(MessageTransferBody messageTransferBody,long correlationId) throws AMQPException; + + public void recover(MessageRecoverBody messageRecoverBody,long correlationId) throws AMQPException; + + public void open(MessageOpenBody messageOpenBody,long correlationId) throws AMQPException ; + + public void close(MessageCloseBody messageCloseBody,long correlationId) throws AMQPException; + + public void append(MessageAppendBody messageAppendBody,long correlationId) throws AMQPException; + + public void checkpoint(MessageCheckpointBody messageCheckpointBody,long correlationId) throws AMQPException; + + public void resume(MessageResumeBody messageResumeBody,long correlationId) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java new file mode 100644 index 0000000000..ff26c6adf5 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPQueue +{ + + /** + * ----------------------------------------------- + * API Methods + * ----------------------------------------------- + */ + public abstract void declare(QueueDeclareBody queueDeclareBody, AMQPCallBack cb) throws AMQPException; + + public abstract void bind(QueueBindBody queueBindBody, AMQPCallBack cb) throws AMQPException; + + // Queue.unbind doesn't have nowait + public abstract void unbind(QueueUnbindBody queueUnbindBody, AMQPCallBack cb) throws AMQPException; + + public abstract void purge(QueuePurgeBody queuePurgeBody, AMQPCallBack cb) throws AMQPException; + + public abstract void delete(QueueDeleteBody queueDeleteBody, AMQPCallBack cb) throws AMQPException; + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java new file mode 100644 index 0000000000..6d0e83bb7e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AbstractAMQPClassFactory.java @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp; + +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.QpidConstants; + +public class AbstractAMQPClassFactory +{ + public static AMQPClassFactory getFactoryInstance() throws AMQPException + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_CLASS_FACTORY); + try + { + return (AMQPClassFactory)Class.forName(className).newInstance(); + } + catch(Exception e) + { + throw new AMQPException("Error creating AMQPClassFactory",e); + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java new file mode 100644 index 0000000000..f682659f9e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.amqp.event; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPEventManager +{ + public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); + public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l); + public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException; +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java new file mode 100644 index 0000000000..c6641890e0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java @@ -0,0 +1,78 @@ +package org.apache.qpid.nclient.amqp.event; + +import org.apache.qpid.framing.AMQMethodBody; + +/** + * This class is exactly the same as the AMQMethod event. + * Except I renamed requestId to corelationId, so I could use it both ways. + * + * I didn't want to modify anything in common so that there is no + * impact on the existing code. + * + */ +public class AMQPMethodEvent<M extends AMQMethodBody> +{ + + private final M _method; + + private final int _channelId; + + /** + * This is the rquest id from the broker when it sent me a request + * when I respond I remember this id and copy this to the outgoing + * response. + */ + private final long _correlationId; + + /** + * I could use _correlationId, bcos when I send a request + * this field is blank and is only used internally. But I + * used a seperate field to make it more clear. + */ + private long _localCorrletionId = 0; + + public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + _localCorrletionId = localCorrletionId; + } + + public AMQPMethodEvent(int channelId, M method, long correlationId) + { + _channelId = channelId; + _method = method; + _correlationId = correlationId; + } + + public M getMethod() + { + return _method; + } + + public int getChannelId() + { + return _channelId; + } + + public long getCorrelationId() + { + return _correlationId; + } + + public long getLocalCorrelationId() + { + return _localCorrletionId; + } + + public String toString() + { + StringBuilder buf = new StringBuilder("Method event: \n"); + buf.append("Channel id: \n").append(_channelId); + buf.append("Method: \n").append(_method); + buf.append("Request Id: ").append(_correlationId); + buf.append("Local Correlation Id: ").append(_localCorrletionId); + return buf.toString(); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java new file mode 100644 index 0000000000..e77a38121c --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java @@ -0,0 +1,11 @@ +package org.apache.qpid.nclient.amqp.event; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPMethodListener +{ + + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException; + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java new file mode 100644 index 0000000000..decb120796 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java @@ -0,0 +1,368 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ChannelResumeBody; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread + * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only + * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per + * channel by design. + * + * A JMS Session can wrap an instance of this class. + */ + +public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListener, AMQPChannel +{ + private static final Logger _logger = Logger.getLogger(QpidAMQPChannel.class); + + // the channelId assigned for this channel + private int _channelId; + + private Phase _phase; + + private AMQPState _currentState; + + private AMQPStateManager _stateManager; + + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + + private final AMQPState[] _validResumeStates = new AMQPState[] + { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _channelNotOpend = _lock.newCondition(); + + private final Condition _channelNotClosed = _lock.newCondition(); + + private final Condition _channelFlowNotResponded = _lock.newCondition(); + + private final Condition _channelNotResumed = _lock.newCondition(); + + private ChannelOpenOkBody _channelOpenOkBody; + + private ChannelCloseOkBody _channelCloseOkBody; + + private ChannelFlowOkBody _channelFlowOkBody; + + private ChannelOkBody _channelOkBody; + + private ChannelCloseBody _channelCloseBody; + + protected QpidAMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager) + { + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.CHANNEL_NOT_OPENED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#open(org.apache.qpid.framing.ChannelOpenBody) + */ + public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException + { + _lock.lock(); + try + { + _channelOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotOpend.await(); + checkIfChannelClosed(); + AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOpenOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.open", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#close(org.apache.qpid.framing.ChannelCloseBody) + */ + public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException + { + _lock.lock(); + try + { + _channelCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotClosed.await(); + AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time"); + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + return _channelCloseOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.close", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#flow(org.apache.qpid.framing.ChannelFlowBody) + */ + public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException + { + _lock.lock(); + try + { + _channelFlowOkBody = null; + if (channelFlowBody.active) + { + checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED); + } + else + { + checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND); + } + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelFlowNotResponded.await(); + checkIfChannelClosed(); + AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); + handleChannelFlowState(_channelFlowOkBody.active); + return _channelFlowOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.flow", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPChannel#resume(org.apache.qpid.framing.ChannelResumeBody) + */ + public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException + { + _lock.lock(); + try + { + _channelOkBody = null; + checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _channelNotResumed.await(); + checkIfChannelClosed(); + AMQPValidator.throwExceptionOnNull(_channelOkBody, + "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _channelOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in channel.resume", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + if (evt.getMethod() instanceof ChannelOpenOkBody) + { + _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod(); + _channelNotOpend.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseOkBody) + { + _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod(); + _channelNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelCloseBody) + { + _channelCloseBody = (ChannelCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleChannelClose(_channelCloseBody); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowOkBody) + { + _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod(); + _channelFlowNotResponded.signal(); + return true; + } + else if (evt.getMethod() instanceof ChannelFlowBody) + { + handleChannelFlow((ChannelFlowBody) evt.getMethod()); + return true; + } + else if (evt.getMethod() instanceof ChannelOkBody) + { + _channelOkBody = (ChannelOkBody) evt.getMethod(); + // In this case the only method expecting channel-ok is channel-resume + // haven't implemented ping and pong. + _channelNotResumed.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException + { + notifyState(AMQPState.CHANNEL_CLOSED); + _currentState = AMQPState.CHANNEL_CLOSED; + // handle channel related cleanup + } + + private void releaseLocks() + { + if (_currentState == AMQPState.CHANNEL_NOT_OPENED) + { + _channelNotOpend.signal(); + _channelNotResumed.signal(); // It could be a channel.resume call + } + else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND) + { + _channelFlowNotResponded.signal(); + } + else if (_currentState == AMQPState.CHANNEL_CLOSED) + { + _channelNotResumed.signal(); + } + } + + private void checkIfChannelClosed() throws AMQPException + { + if (_channelCloseBody != null) + { + String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code (" + + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method " + + _channelCloseBody.getMethod(); + + throw new AMQPException(error); + } + } + + private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException + { + _lock.lock(); + try + { + handleChannelFlowState(channelFlowBody.active); + } + finally + { + _lock.unlock(); + } + } + + private void handleChannelFlowState(boolean flow)throws AMQPException + { + notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND); + _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; + } + + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java new file mode 100644 index 0000000000..809aa57dab --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOkBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.transport.TransportConnectionFactory; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; +import org.apache.qpid.url.URLSyntaxException; + +/** + * The Class Factory creates AMQP Class + * equivalents defined in the spec. + * + * There should one instance per connection. + * The factory class creates all the support + * classes and provides an instance of the + * AMQP class in ready-to-use state. + * + */ +public class QpidAMQPClassFactory implements AMQPClassFactory +{ + //Need an event manager per connection + private AMQPEventManager _eventManager = new QpidEventManager(); + + // Need a state manager per connection + private AMQPStateManager _stateManager = new QpidStateManager(); + + //Need a phase pipe per connection + private Phase _phase; + + //One instance per connection + private QpidAMQPConnection _amqpConnection; + + public QpidAMQPClassFactory() + { + + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) + */ + public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException + { + AMQPConnectionURL url = new AMQPConnectionURL(urlStr); + return createConnectionClass(url, type); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType) + */ + public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException + { + if (_amqpConnection == null) + { + PhaseContext ctx = new DefaultPhaseContext(); + ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager); + + TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx); + _amqpConnection = new QpidAMQPConnection(conn, _stateManager); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection); + _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection); + } + return _amqpConnection; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int) + */ + public AMQPChannel createChannelClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager); + _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + return amqpChannel; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel) + */ + public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel); + _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int) + */ + public AMQPExchange createExchangeClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase); + _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + return amqpExchange; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange) + */ + public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange); + _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int) + */ + public AMQPQueue createQueueClass(int channel) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase); + _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + return amqpQueue; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue) + */ + public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue); + _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack) + */ + public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException + { + checkIfConnectionStarted(); + QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb); + _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + + return amqpMessage; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage) + */ + public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException + { + _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage); + _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage); + } + + //This class should register as a state listener for AMQPConnection + private void checkIfConnectionStarted() throws AMQPException + { + if (_phase == null) + { + _phase = _amqpConnection.getPhasePipe(); + + if (_phase == null) + { + throw new AMQPException("Cannot create a channel until connection is ready"); + } + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager() + */ + public AMQPEventManager getEventManager() + { + return _eventManager; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager() + */ + public AMQPStateManager getStateManager() + { + return _stateManager; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java new file mode 100644 index 0000000000..9b4d776cc5 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java @@ -0,0 +1,448 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.transport.TransportConnection; +import org.apache.qpid.nclient.util.AMQPValidator; + +/** + * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is + * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an + * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time. + */ +public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection +{ + private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class); + + private Phase _phase; + + private TransportConnection _connection; + + private long _correlationId; + + private AMQPState _currentState; + + private AMQPStateManager _stateManager; + + private final AMQPState[] _validCloseStates = new AMQPState[] + { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, + AMQPState.CONNECTION_OPEN, }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _connectionNotStarted = _lock.newCondition(); + + private final Condition _connectionNotSecure = _lock.newCondition(); + + private final Condition _connectionNotTuned = _lock.newCondition(); + + private final Condition _connectionNotOpened = _lock.newCondition(); + + private final Condition _connectionNotClosed = _lock.newCondition(); + + private ConnectionStartBody _connectionStartBody; + + private ConnectionSecureBody _connectionSecureBody; + + private ConnectionTuneBody _connectionTuneBody; + + private ConnectionOpenOkBody _connectionOpenOkBody; + + private ConnectionCloseOkBody _connectionCloseOkBody; + + private ConnectionCloseBody _connectionCloseBody; + + protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager) + { + _connection = connection; + _stateManager = stateManager; + _currentState = AMQPState.CONNECTION_UNDEFINED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection() + */ + public ConnectionStartBody openTCPConnection() throws AMQPException + { + _lock.lock(); + // open the TCP connection + try + { + _connectionStartBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED); + _phase = _connection.connect(); + + // waiting for ConnectionStartBody or error in connection + //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotStarted.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time"); + notifyState(AMQPState.CONNECTION_NOT_STARTED); + _currentState = AMQPState.CONNECTION_NOT_STARTED; + return _connectionStartBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error opening connection to broker", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody) + */ + public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId); + _phase.messageSent(msg); + // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS); + _connectionNotSecure.await(); + + checkIfConnectionClosed(); + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.startOk", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody) + */ + public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionTuneBody = null; + _connectionSecureBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED); + + _connectionSecureBody = null; // The server could send a fresh challenge + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId); + _phase.messageSent(msg); + + //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotTuned.await(); + checkIfConnectionClosed(); + + if (_connectionTuneBody != null) + { + notifyState(AMQPState.CONNECTION_NOT_TUNED); + _currentState = AMQPState.CONNECTION_NOT_TUNED; + return _connectionTuneBody; + } + else if (_connectionSecureBody != null) + { // oops the server sent another challenge + notifyState(AMQPState.CONNECTION_NOT_SECURE); + _currentState = AMQPState.CONNECTION_NOT_SECURE; + return _connectionSecureBody; + } + else + { + throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time"); + } + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.secureOk", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody) + */ + public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException + { + _lock.lock(); + try + { + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED); + _connectionSecureBody = null; + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId); + _phase.messageSent(msg); + notifyState(AMQPState.CONNECTION_NOT_OPENED); + _currentState = AMQPState.CONNECTION_NOT_OPENED; + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody) + */ + public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException + { + _lock.lock(); + try + { + // If the broker sends a connection close due to an error with the + // Connection tune ok, then this call will verify that + checkIfConnectionClosed(); + + _connectionOpenOkBody = null; + checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _connectionNotOpened.await(); + + checkIfConnectionClosed(); + AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time"); + notifyState(AMQPState.CONNECTION_OPEN); + _currentState = AMQPState.CONNECTION_OPEN; + return _connectionOpenOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.open", e); + } + finally + { + _lock.unlock(); + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody) + */ + public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException + { + _lock.lock(); + try + { + _connectionCloseOkBody = null; + checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED); + AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS); + AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time"); + notifyState(AMQPState.CONNECTION_CLOSED); + _currentState = AMQPState.CONNECTION_CLOSED; + return _connectionCloseOkBody; + } + catch (InterruptedException e) + { + throw new AMQPException("Error in connection.close", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- + * AMQMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + _correlationId = evt.getCorrelationId(); + + if (evt.getMethod() instanceof ConnectionStartBody) + { + _connectionStartBody = (ConnectionStartBody) evt.getMethod(); + _connectionNotStarted.signalAll(); + return true; + } + else if (evt.getMethod() instanceof ConnectionSecureBody) + { + _connectionSecureBody = (ConnectionSecureBody) evt.getMethod(); + _connectionNotSecure.signal(); + _connectionNotTuned.signal(); // in case the server has sent another chanllenge + return true; + } + else if (evt.getMethod() instanceof ConnectionTuneBody) + { + _connectionTuneBody = (ConnectionTuneBody) evt.getMethod(); + _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk + _connectionNotTuned.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionOpenOkBody) + { + _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod(); + _connectionNotOpened.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseOkBody) + { + _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod(); + _connectionNotClosed.signal(); + return true; + } + else if (evt.getMethod() instanceof ConnectionCloseBody) + { + _connectionCloseBody = (ConnectionCloseBody) evt.getMethod(); + // release the correct lock as u may have some conditions waiting. + // while an error occured and the broker has sent a close. + releaseLocks(); + handleClose(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } + } + + + public Phase getPhasePipe() + { + return _phase; + } + + private void handleClose() throws AMQPException + { + try + { + notifyState(AMQPState.CONNECTION_CLOSING); + _currentState = AMQPState.CONNECTION_CLOSING; + // do the required cleanup and send a ConnectionCloseOkBody + } + catch (Exception e) + { + throw new AMQPException("Error handling connection.close from broker", e); + } + } + + private void checkIfConnectionClosed() throws AMQPException + { + if (_connectionCloseBody != null) + { + String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code (" + + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method " + + _connectionCloseBody.getMethod(); + + throw new AMQPException(error); + } + } + + private void releaseLocks() + { + if (_currentState == AMQPState.CONNECTION_NOT_OPENED) + { + _connectionNotOpened.signal(); + } + else if (_currentState == AMQPState.CONNECTION_UNDEFINED) + { + _connectionNotStarted.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_STARTED) + { + _connectionNotSecure.signal(); + } + else if (_currentState == AMQPState.CONNECTION_NOT_SECURE) + { + _connectionNotTuned.signal(); + } + } + + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE)); + } + +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java new file mode 100644 index 0000000000..70ea6bd27d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxCoordination.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.DtxCoordinationCommitBody; +import org.apache.qpid.framing.DtxCoordinationCommitOkBody; +import org.apache.qpid.framing.DtxCoordinationForgetBody; +import org.apache.qpid.framing.DtxCoordinationForgetOkBody; +import org.apache.qpid.framing.DtxCoordinationGetTimeoutBody; +import org.apache.qpid.framing.DtxCoordinationGetTimeoutOkBody; +import org.apache.qpid.framing.DtxCoordinationPrepareBody; +import org.apache.qpid.framing.DtxCoordinationPrepareOkBody; +import org.apache.qpid.framing.DtxCoordinationRecoverBody; +import org.apache.qpid.framing.DtxCoordinationRecoverOkBody; +import org.apache.qpid.framing.DtxCoordinationRollbackBody; +import org.apache.qpid.framing.DtxCoordinationSetTimeoutBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPDtxCoordination; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +public class QpidAMQPDtxCoordination extends AMQPCallBackSupport implements AMQPMethodListener, AMQPDtxCoordination +{ + private Phase _phase; + + protected QpidAMQPDtxCoordination(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + public void commit(DtxCoordinationCommitBody dtxCoordinationCommitBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationCommitBody,cb); + _phase.messageSent(msg); + } + + public void forget(DtxCoordinationForgetBody dtxCoordinationForgetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationForgetBody,cb); + _phase.messageSent(msg); + } + + public void getTimeOut(DtxCoordinationGetTimeoutBody dtxCoordinationGetTimeoutBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationGetTimeoutBody,cb); + _phase.messageSent(msg); + } + + public void rollback(DtxCoordinationRollbackBody dtxCoordinationRollbackBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRollbackBody,cb); + _phase.messageSent(msg); + } + + public void prepare(DtxCoordinationPrepareBody dtxCoordinationPrepareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationPrepareBody,cb); + _phase.messageSent(msg); + } + + public void recover(DtxCoordinationRecoverBody dtxCoordinationRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationRecoverBody,cb); + _phase.messageSent(msg); + } + + public void setTimeOut(DtxCoordinationSetTimeoutBody dtxCoordinationSetTimeoutBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(dtxCoordinationSetTimeoutBody,cb); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof DtxCoordinationCommitOkBody || + methodBody instanceof DtxCoordinationForgetOkBody || + methodBody instanceof DtxCoordinationGetTimeoutOkBody || + methodBody instanceof DtxCoordinationPrepareOkBody || + methodBody instanceof DtxCoordinationRecoverOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java new file mode 100644 index 0000000000..fd7b6f8ec6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java @@ -0,0 +1,224 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.DtxDemarcationEndBody; +import org.apache.qpid.framing.DtxDemarcationEndOkBody; +import org.apache.qpid.framing.DtxDemarcationSelectBody; +import org.apache.qpid.framing.DtxDemarcationSelectOkBody; +import org.apache.qpid.framing.DtxDemarcationStartBody; +import org.apache.qpid.framing.DtxDemarcationStartOkBody; +import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.util.AMQPValidator; + +public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation +{ + private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class); + + // the channelId that will be used for transactions + private int _channelId; + + private Phase _phase; + + private AMQPState _currentState; + + private AMQPStateManager _stateManager; + + private final AMQPState[] _validEndStates = new AMQPState[] + { AMQPState.DTX_STARTED }; + + private final AMQPState[] _validStartStates = new AMQPState[] + { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END }; + + // The wait period until a server sends a respond + private long _serverTimeOut = 1000; + + private final Lock _lock = new ReentrantLock(); + + private final Condition _dtxNotSelected = _lock.newCondition(); + + private final Condition _dtxNotStarted = _lock.newCondition(); + + // maybe it needs a better name + private final Condition _dtxNotEnd = _lock.newCondition(); + + private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody; + + private DtxDemarcationStartOkBody _dtxDemarcationStartOkBody; + + private DtxDemarcationEndOkBody _dtxDemarcationEndOkBody; + + protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager) + { + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ + public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException + { + _lock.lock(); + try + { + _dtxDemarcationSelectOkBody = null; + checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, dtxDemarcationSelectBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_dtxNotSelected.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotSelected.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time"); + notifyState(AMQPState.DTX_NOT_STARTED); + _currentState = AMQPState.DTX_NOT_STARTED; + return _dtxDemarcationSelectOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.select", e); + } + finally + { + _lock.unlock(); + } + } + + public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException + { + _lock.lock(); + try + { + _dtxDemarcationStartOkBody = null; + checkIfValidStateTransition(_validStartStates, _currentState, AMQPState.DTX_STARTED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationStartOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_dtxNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotStarted.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationStartOkBody, "The broker didn't send the DtxDemarcationStartOkBody in time"); + notifyState(AMQPState.DTX_STARTED); + _currentState = AMQPState.DTX_STARTED; + return _dtxDemarcationStartOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.start", e); + } + finally + { + _lock.unlock(); + } + } + + public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException + { + _lock.lock(); + try + { + _dtxDemarcationEndOkBody = null; + checkIfValidStateTransition(_validEndStates, _currentState, AMQPState.DTX_END); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationEndOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_dtxNotEnd.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotEnd.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationEndOkBody, "The broker didn't send the DtxDemarcationEndOkBody in time"); + notifyState(AMQPState.DTX_END); + _currentState = AMQPState.DTX_END; + return _dtxDemarcationEndOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.start", e); + } + finally + { + _lock.unlock(); + } + } + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + _lock.lock(); + try + { + if (evt.getMethod() instanceof DtxDemarcationSelectOkBody) + { + _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); + _dtxNotSelected.signal(); + return true; + } + else if (evt.getMethod() instanceof DtxDemarcationStartOkBody) + { + _dtxDemarcationStartOkBody = (DtxDemarcationStartOkBody) evt.getMethod(); + _dtxNotStarted.signal(); + return true; + } + else if (evt.getMethod() instanceof DtxDemarcationEndOkBody) + { + _dtxDemarcationEndOkBody = (DtxDemarcationEndOkBody) evt.getMethod(); + _dtxNotEnd.signal(); + return true; + } + else + { + return false; + } + } + finally + { + _lock.unlock(); + } + } + + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java new file mode 100644 index 0000000000..02b22e0755 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareOkBody; +import org.apache.qpid.framing.ExchangeDeleteBody; +import org.apache.qpid.framing.ExchangeDeleteOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * + * This class represents the Exchange class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange +{ + private Phase _phase; + + protected QpidAMQPExchange(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java new file mode 100644 index 0000000000..e40c3cefa2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageEmptyBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageOffsetBody; +import org.apache.qpid.framing.MessageOkBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageQosBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageRejectBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * This class represents the AMQP Message class. + * You need an instance of this class per channel. + * A @see AMQPMessageCallBack class is taken as an argument in the constructor. + * A client can use this class to issue Message class methods on the broker. + * When the broker issues Message class methods on the client, the client is notified + * via the AMQPMessageCallBack interface. + * + * A JMS Message producer implementation can wrap an instance if this and map + * JMS method calls to the appropriate AMQP methods. + * + * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation. + * + */ +public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage +{ + private Phase _phase; + private AMQPMessageCallBack _messageCb; + + protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb) + { + super(channelId); + _phase = phase; + _messageCb = messageCb; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + + public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long) + */ + public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long) + */ + public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long) + */ + public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException + { + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId); + _phase.messageSent(msg); + } + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof MessageOkBody || + methodBody instanceof MessageRejectBody || + methodBody instanceof MessageEmptyBody) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else if (methodBody instanceof MessageTransferBody) + { + _messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageAppendBody) + { + _messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageOpenBody) + { + _messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCloseBody) + { + _messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageCheckpointBody) + { + _messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageRecoverBody) + { + _messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId()); + return true; + } + else if (methodBody instanceof MessageResumeBody) + { + _messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId()); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java new file mode 100644 index 0000000000..323ff0cf06 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueBindOkBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.framing.QueueUnbindBody; +import org.apache.qpid.framing.QueueUnbindOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPCallBackSupport; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +/** + * + * This class represents the Queue class defined in AMQP. + * Each method takes an @see AMQPCallBack object if it wants to know + * the response from the broker to a particular method. + * Clients can handle the reponse asynchronously or block for a response + * using AMQPCallBack.isComplete() periodically using a loop. + */ +public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue +{ + private Phase _phase; + + protected QpidAMQPQueue(int channelId,Phase phase) + { + super(channelId); + _phase = phase; + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb); + _phase.messageSent(msg); + } + + // Queue.unbind doesn't have nowait + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb); + _phase.messageSent(msg); + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack) + */ + public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException + { + AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb); + _phase.messageSent(msg); + } + + + /**------------------------------------------- + * AMQPMethodListener methods + *-------------------------------------------- + */ + public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException + { + long localCorrelationId = evt.getLocalCorrelationId(); + AMQMethodBody methodBody = evt.getMethod(); + if ( methodBody instanceof QueueDeclareOkBody || + methodBody instanceof QueueBindOkBody || + methodBody instanceof QueueUnbindOkBody || + methodBody instanceof QueuePurgeOkBody || + methodBody instanceof QueueDeleteOkBody + ) + { + invokeCallBack(localCorrelationId,methodBody); + return true; + } + else + { + return false; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java new file mode 100644 index 0000000000..902c80fe77 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; +import org.apache.qpid.nclient.core.AMQPException; + +/** + * This class registeres with the ModelPhase as a AMQMethodListener, + * to receive method events and then it distributes methods to other listerners + * using a filtering criteria. The criteria is channel id and method body class. + * The method listeners are added and removed dynamically + * + * <p/> + */ +public class QpidEventManager implements AMQPEventManager +{ + private static final Logger _logger = Logger.getLogger(QpidEventManager.class); + + private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>(); + + /** + * ------------------------------------------------ + * methods introduced by AMQEventManager + * ------------------------------------------------ + */ + public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) + { + Map<Class, List> _methodListenerMap; + if (_channelMap.containsKey(channelId)) + { + _methodListenerMap = _channelMap.get(channelId); + + } + else + { + _methodListenerMap = new ConcurrentHashMap<Class, List>(); + _channelMap.put(channelId, _methodListenerMap); + } + + List<AMQPMethodListener> _listeners; + if (_methodListenerMap.containsKey(clazz)) + { + _listeners = _methodListenerMap.get(clazz); + } + else + { + _listeners = new ArrayList<AMQPMethodListener>(); + _methodListenerMap.put(clazz, _listeners); + } + + _listeners.add(l); + + } + + public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l) + { + if (_channelMap.containsKey(channelId)) + { + Map<Class, List> _methodListenerMap = _channelMap.get(channelId); + + if (_methodListenerMap.containsKey(clazz)) + { + List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz); + _listeners.remove(l); + } + + } + } + + /* (non-Javadoc) + * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent) + */ + public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException + { + if (_channelMap.containsKey(evt.getChannelId())) + { + Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId()); + + if (_methodListenerMap.containsKey(evt.getMethod().getClass())) + { + + List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass()); + for (AMQPMethodListener l : _listeners) + { + l.methodReceived(evt); + } + + return (_listeners.size() > 0); + } + + } + + return false; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java new file mode 100644 index 0000000000..3fe1ee4cfd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.qpid; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateListener; +import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.core.AMQPException; + +public class QpidStateManager implements AMQPStateManager +{ + + private static final Logger _logger = Logger.getLogger(QpidStateManager.class); + + private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>(); + + public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException + { + List<AMQPStateListener> list; + if(_listernerMap.containsKey(stateType)) + { + list = _listernerMap.get(stateType); + } + else + { + list = new ArrayList<AMQPStateListener>(); + _listernerMap.put(stateType, list); + } + list.add(l); + } + + public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException + { + if(_listernerMap.containsKey(stateType)) + { + List<AMQPStateListener> list = _listernerMap.get(stateType); + list.remove(l); + } + } + + public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException + { + + if(_listernerMap.containsKey(event.getStateType())) + { + List<AMQPStateListener> list = _listernerMap.get(event.getStateType()); + for(AMQPStateListener l: list) + { + l.stateChanged(event); + } + } + else + { + _logger.warn("There are no registered listerners for state type" + event.getStateType()); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java new file mode 100644 index 0000000000..31a0197ab1 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.sample; + +import org.apache.qpid.framing.MessageAppendBody; +import org.apache.qpid.framing.MessageCheckpointBody; +import org.apache.qpid.framing.MessageCloseBody; +import org.apache.qpid.framing.MessageOpenBody; +import org.apache.qpid.framing.MessageRecoverBody; +import org.apache.qpid.framing.MessageResumeBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.nclient.amqp.AMQPMessageCallBack; +import org.apache.qpid.nclient.core.AMQPException; + +public class MessageHelper implements AMQPMessageCallBack +{ + + public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException + { + // TODO Auto-generated method stub + + } + + public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException + { + System.out.println("The Broker has sent a message" + messageTransferBody.toString()); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java new file mode 100644 index 0000000000..908f0adee0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/SecurityHelper.java @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.sample; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.StringTokenizer; + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; +import org.apache.qpid.nclient.security.CallbackHandlerRegistry; +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class SecurityHelper +{ + public static String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException + { + final String mechanisms = new String(availableMechanisms, "utf8"); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + HashSet mechanismSet = new HashSet(); + while (tokenizer.hasMoreTokens()) + { + mechanismSet.add(tokenizer.nextToken()); + } + + String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms(); + StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " "); + while (prefTokenizer.hasMoreTokens()) + { + String mech = prefTokenizer.nextToken(); + if (mechanismSet.contains(mech)) + { + return mech; + } + } + return null; + } + + public static AMQPCallbackHandler createCallbackHandler(String mechanism, ConnectionURL url) + throws AMQPException + { + Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism); + try + { + Object instance = mechanismClass.newInstance(); + AMQPCallbackHandler cbh = (AMQPCallbackHandler) instance; + cbh.initialise(url); + return cbh; + } + catch (Exception e) + { + throw new AMQPException("Unable to create callback handler: " + e, e); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java new file mode 100644 index 0000000000..04714d7278 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.sample; + +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateListener; +import org.apache.qpid.nclient.core.AMQPException; + +public class StateHelper implements AMQPStateListener +{ + + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException + { + String s = event.getStateType() + " changed state from " + + event.getOldState() + " to " + event.getNewState(); + + System.out.println(s); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java new file mode 100644 index 0000000000..3dce1cde1e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java @@ -0,0 +1,454 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.sample; + +import java.util.StringTokenizer; +import java.util.UUID; + +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; + +import org.apache.qpid.AMQException; +import org.apache.qpid.common.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelFlowBody; +import org.apache.qpid.framing.ChannelFlowOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ChannelOpenOkBody; +import org.apache.qpid.framing.ConnectionOpenBody; +import org.apache.qpid.framing.ConnectionOpenOkBody; +import org.apache.qpid.framing.ConnectionSecureBody; +import org.apache.qpid.framing.ConnectionSecureOkBody; +import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionStartOkBody; +import org.apache.qpid.framing.ConnectionTuneBody; +import org.apache.qpid.framing.ConnectionTuneOkBody; +import org.apache.qpid.framing.Content; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; +import org.apache.qpid.framing.MessageCancelBody; +import org.apache.qpid.framing.MessageConsumeBody; +import org.apache.qpid.framing.MessageGetBody; +import org.apache.qpid.framing.MessageTransferBody; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.QueueDeclareOkBody; +import org.apache.qpid.framing.QueueDeleteBody; +import org.apache.qpid.framing.QueueDeleteOkBody; +import org.apache.qpid.framing.QueuePurgeBody; +import org.apache.qpid.framing.QueuePurgeOkBody; +import org.apache.qpid.nclient.amqp.AMQPCallBack; +import org.apache.qpid.nclient.amqp.AMQPChannel; +import org.apache.qpid.nclient.amqp.AMQPClassFactory; +import org.apache.qpid.nclient.amqp.AMQPConnection; +import org.apache.qpid.nclient.amqp.AMQPExchange; +import org.apache.qpid.nclient.amqp.AMQPMessage; +import org.apache.qpid.nclient.amqp.AMQPQueue; +import org.apache.qpid.nclient.amqp.AbstractAMQPClassFactory; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.transport.AMQPConnectionURL; +import org.apache.qpid.nclient.transport.ConnectionURL; +import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType; + +/** + * This class illustrates the usage of the API + * Notes this is just a simple demo. + * + * I have used Helper classes to keep the code cleaner. + * Will break this into unit tests later on + */ + +@SuppressWarnings("unused") +public class TestClient +{ + private byte _major; + + private byte _minor; + + private ConnectionURL _url; + + private static int _channel = 2; + + // Need a Class factory per connection + private AMQPClassFactory _classFactory; + + private int _ticket; + + public AMQPConnection openConnection() throws Exception + { + _classFactory = AbstractAMQPClassFactory.getFactoryInstance(); + + //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'"); + + _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'"); + return _classFactory.createConnectionClass(_url, ConnectionType.TCP); + } + + public void handleConnectionNegotiation(AMQPConnection con) throws Exception + { + StateHelper stateHelper = new StateHelper(); + _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper); + _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper); + + //ConnectionStartBody + ConnectionStartBody connectionStartBody = con.openTCPConnection(); + _major = connectionStartBody.getMajor(); + _minor = connectionStartBody.getMinor(); + + FieldTable clientProperties = FieldTableFactory.newFieldTable(); + clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id + + final String locales = new String(connectionStartBody.getLocales(), "utf8"); + final StringTokenizer tokenizer = new StringTokenizer(locales, " "); + + final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); + + SaslClient sc = Sasl.createSaslClient(new String[] + { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url)); + + ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString( + tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null)); + // ConnectionSecureBody + AMQMethodBody body = con.startOk(connectionStartOkBody); + ConnectionTuneBody connectionTuneBody; + + if (body instanceof ConnectionSecureBody) + { + ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body; + ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc + .evaluateChallenge(connectionSecureBody.getChallenge())); + //Assuming the server is not going to send another challenge + connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody); + + } + else + { + connectionTuneBody = (ConnectionTuneBody) body; + } + + // Using broker supplied values + ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(), + connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat()); + con.tuneOk(connectionTuneOkBody); + + ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url + .getVirtualHost())); + + ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody); + } + + public void handleChannelNegotiation() throws Exception + { + AMQPChannel channel = _classFactory.createChannelClass(_channel); + + ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1")); + ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody); + + //lets have some fun + ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false); + + ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + + channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true); + channelFlowOkBody = channel.flow(channelFlowBody); + System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend")); + } + + public void createExchange() throws Exception + { + AMQPExchange exchange = _classFactory.createExchangeClass(_channel); + + ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments + false,//auto delete + false,// durable + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal + false,// nowait + false,// passive + _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)); + + AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange"); + exchange.declare(exchangeDeclareBody, cb); + // Blocking for response + while (!cb.isComplete()) + { + } + } + + public void createAndBindQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments + false,//auto delete + false,// durable + false, //exclusive, + false, //nowait, + false, //passive, + new AMQShortString("MyTestQueue"), 0); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body; + System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count " + + queueDeclareOkBody.getConsumerCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.declare(queueDeclareBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange + false, //nowait + new AMQShortString("MyTestQueue"), //queue + new AMQShortString("RH"), //routingKey + 0 //ticket + ); + + cb = createCallBackWithMessage("Broker has bound the queue"); + queue.bind(queueBindBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + } + + public void purgeQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body; + System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.purge(queuePurgeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + } + + public void deleteQueue() throws Exception + { + AMQPQueue queue = _classFactory.createQueueClass(_channel); + + QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty + false, //ifUnused + false, //nowait + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body; + System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n"); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + queue.delete(queueDeleteBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + } + + public void publishAndSubscribe() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination + false, //exclusive + null, //filter + false, //noAck, + false, //noLocal, + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume"); + message.consume(messageConsumeBody, cb); + //Blocking for response + while (!cb.isComplete()) + { + } + + // Sending 5 messages serially + for (int i = 0; i < 5; i++) + { + cb = createCallBackWithMessage("Broker has accepted msg " + i); + message.transfer(createMessages("Test" + i), cb); + while (!cb.isComplete()) + { + } + } + + MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient")); + + AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel"); + message.cancel(messageCancelBody, cb2); + + } + + private MessageTransferBody createMessages(String content) throws Exception + { + FieldTable headers = FieldTableFactory.newFieldTable(); + headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + ""); + + MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId + headers, //applicationHeaders + new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body + new AMQShortString(""), //contentEncoding, + new AMQShortString("text/plain"), //contentType + new AMQShortString("testApp"), //correlationId + (short) 1, //deliveryMode non persistant + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination + new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange + 0l, //expiration + false, //immediate + false, //mandatory + new AMQShortString(UUID.randomUUID().toString()), //messageId + (short) 0, //priority + false, //redelivered + new AMQShortString("RH"), //replyTo + new AMQShortString("RH"), //routingKey, + "abc".getBytes(), //securityToken + 0, //ticket + System.currentTimeMillis(), //timestamp + new AMQShortString(""), //transactionId + 0l, //ttl, + new AMQShortString("Hello") //userId + ); + + return messageTransferBody; + + } + + public void publishAndGet() throws Exception + { + AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper()); + AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5"); + + MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck + new AMQShortString("MyTestQueue"), //queue + 0 //ticket + ); + + //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper()); + message.transfer(createMessages("Test"), cb); + while (!cb.isComplete()) + { + } + + cb = createCallBackWithMessage("Broker has accepted get"); + message.get(messageGetBody, cb); + } + + // Creates a gneric call back and prints the given message + private AMQPCallBack createCallBackWithMessage(final String msg) + { + AMQPCallBack cb = new AMQPCallBack() + { + + @Override + public void brokerResponded(AMQMethodBody body) + { + System.out.println(msg); + } + + @Override + public void brokerRespondedWithError(AMQException e) + { + } + + }; + + return cb; + } + + public static void main(String[] args) + { + TestClient test = new TestClient(); + try + { + AMQPConnection con = test.openConnection(); + test.handleConnectionNegotiation(con); + test.handleChannelNegotiation(); + test.createExchange(); + test.createAndBindQueue(); + test.publishAndSubscribe(); + test.purgeQueue(); + test.publishAndGet(); + test.deleteQueue(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java new file mode 100644 index 0000000000..18219abc46 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * States used in the AMQ protocol. Used by the finite state machine to determine + * valid responses. + */ +public class AMQPState +{ + private final int _id; + + private final String _name; + + private AMQPState(int id, String name) + { + _id = id; + _name = name; + } + + public String toString() + { + //return "AMQState: id = " + _id + " name: " + _name; + return _name; // looks better with loggin + } + + // Connection state + public static final AMQPState CONNECTION_UNDEFINED = new AMQPState(0, "CONNECTION_UNDEFINED"); + public static final AMQPState CONNECTION_NOT_STARTED = new AMQPState(1, "CONNECTION_NOT_STARTED"); + public static final AMQPState CONNECTION_NOT_SECURE = new AMQPState(2, "CONNECTION_NOT_SECURE"); + public static final AMQPState CONNECTION_NOT_TUNED = new AMQPState(2, "CONNECTION_NOT_TUNED"); + public static final AMQPState CONNECTION_NOT_OPENED = new AMQPState(3, "CONNECTION_NOT_OPENED"); + public static final AMQPState CONNECTION_OPEN = new AMQPState(4, "CONNECTION_OPEN"); + public static final AMQPState CONNECTION_CLOSING = new AMQPState(5, "CONNECTION_CLOSING"); + public static final AMQPState CONNECTION_CLOSED = new AMQPState(6, "CONNECTION_CLOSED"); + + // Channel state + public static final AMQPState CHANNEL_NOT_OPENED = new AMQPState(10, "CHANNEL_NOT_OPENED"); + public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); + public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); + public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); + + // Distributed Transaction state + public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED"); + public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED"); + public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED"); + public static final AMQPState DTX_END = new AMQPState(10, "DTX_END"); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java new file mode 100644 index 0000000000..506d267a9e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +public class AMQPStateChangedEvent +{ + private AMQPState _oldState; + + private AMQPState _newState; + + private AMQPStateType _stateType; + + public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType) + { + _oldState = oldState; + _newState = newState; + _stateType = stateType; + } + + public AMQPState getNewState() + { + return _newState; + } + + public void setNewState(AMQPState newState) + { + this._newState = newState; + } + + public AMQPState getOldState() + { + return _oldState; + } + + public void setOldState(AMQPState oldState) + { + this._oldState = oldState; + } + + public AMQPStateType getStateType() + { + return _stateType; + } + + public void setStateType(AMQPStateType stateType) + { + this._stateType = stateType; + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java new file mode 100644 index 0000000000..974f707504 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java @@ -0,0 +1,8 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPStateListener +{ + public void stateChanged(AMQPStateChangedEvent event) throws AMQPException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java new file mode 100644 index 0000000000..c1fde7181d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateMachine.java @@ -0,0 +1,26 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPStateMachine +{ + protected void checkIfValidStateTransition(AMQPState correctState,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + if (currentState != correctState) + { + throw new IllegalStateTransitionException(currentState,requiredState); + } + } + + protected void checkIfValidStateTransition(AMQPState[] correctStates,AMQPState currentState,AMQPState requiredState) throws IllegalStateTransitionException + { + for(AMQPState correctState :correctStates) + { + if (currentState == correctState) + { + return; + } + } + throw new IllegalStateTransitionException(currentState,requiredState); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java new file mode 100644 index 0000000000..9bc60b658e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java @@ -0,0 +1,13 @@ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.AMQException; +import org.apache.qpid.nclient.core.AMQPException; + +public interface AMQPStateManager +{ + public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + + public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException; + + public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException; +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java new file mode 100644 index 0000000000..e190d639f2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java @@ -0,0 +1,50 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +/** + * The Type of States used in the AMQ protocol. + * This allows to partition listeners by the type of states they want + * to listen rather than all. + * For example an Object might only be interested in Channel state + */ +public class AMQPStateType +{ + private final int _typeId; + + private final String _typeName; + + private AMQPStateType(int id, String name) + { + _typeId = id; + _typeName = name; + } + + public String toString() + { + return _typeName; + } + + // Connection state + public static final AMQPStateType CONNECTION_STATE = new AMQPStateType(0, "CONNECTION_STATE"); + public static final AMQPStateType CHANNEL_STATE = new AMQPStateType(1, "CHANNEL_STATE"); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java new file mode 100644 index 0000000000..fdc24d1d2f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/IllegalStateTransitionException.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.amqp.state; + +import org.apache.qpid.nclient.core.AMQPException; + +public class IllegalStateTransitionException extends AMQPException +{ + private AMQPState _currentState; + private AMQPState _desiredState; + + public IllegalStateTransitionException(AMQPState currentState, AMQPState desiredState) + { + super("No valid state transition defined from state " + currentState + + " to state " + desiredState); + _currentState = currentState; + _desiredState = desiredState; + } + + public AMQPState getCurrentState() + { + return _currentState; + } + + public AMQPState getDesiredState() + { + return _desiredState; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java new file mode 100644 index 0000000000..7bc77e02c0 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java @@ -0,0 +1,100 @@ +package org.apache.qpid.nclient.config; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Collection; +import java.util.List; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.commons.configuration.CombinedConfiguration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.SystemConfiguration; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.security.AMQPCallbackHandler; + +/** + * Loads a properties file from classpath. + * These values can be overwritten using system properties + */ +public class ClientConfiguration extends CombinedConfiguration { + + private static final Logger _logger = Logger.getLogger(ClientConfiguration.class); + private static ClientConfiguration _instance = new ClientConfiguration(); + + ClientConfiguration() + { + super(); + addConfiguration(new SystemConfiguration()); + try + { + XMLConfiguration config = new XMLConfiguration(); + config.load(getInputStream()); + addConfiguration(config); + } + catch (ConfigurationException e) + { + _logger.warn("Client Properties missing, using defaults",e); + } + } + + public static ClientConfiguration get() + { + return _instance; + } + + private InputStream getInputStream() + { + if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null) + { + try + { + return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH)); + } + catch(Exception e) + { + return this.getClass().getResourceAsStream("client.xml"); + } + } + else + { + return this.getClass().getResourceAsStream("client.xml"); + } + + } + + public static void main(String[] args) + { + String key = QpidConstants.AMQP_SECURITY + "." + + QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." + + QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY; + + TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = + new TreeMap<String, Class<? extends SaslClientFactory>>(); + + int index = ClientConfiguration.get().getMaxIndex(key); + + for (int i=0; i<index+1;i++) + { + String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); + String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); + try + { + Class<?> clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); + } + catch (Exception ex) + { + _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml new file mode 100644 index 0000000000..6eeedbe1ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="ISO-8859-1" ?> +<qpidClientConfig> + + <security> + <saslClientFactoryTypes> + <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory> + </saslClientFactoryTypes> + <securityMechanisms> + <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler> + </securityMechanisms> + </security> + + <!-- Transport Layer properties --> + <useSharedReadWritePool>false</useSharedReadWritePool> + <enableDirectBuffers>true</enableDirectBuffers> + <enablePooledAllocator>false</enablePooledAllocator> + <tcpNoDelay>true</tcpNoDelay> + <sendBufferSizeInKb>32</sendBufferSizeInKb> + <reciveBufferSizeInKb>32</reciveBufferSizeInKb> + <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass> + + <!-- Execution Layer properties --> + <maxAccumilatedResponses>20</maxAccumilatedResponses> + + <!-- Model Phase properties --> + <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds> + <amqpClassFactory>org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory</amqpClassFactory> + <maxAccumilatedResponses>20</maxAccumilatedResponses> + + <phasePipe> + <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase> + <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase> + <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase> + </phasePipe> + +</qpidClientConfig>
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java new file mode 100644 index 0000000000..a029f7d4ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java @@ -0,0 +1,55 @@ +package org.apache.qpid.nclient.core; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; + + +public class AMQPException extends Exception +{ + private int _errorCode; + + public AMQPException(String message) + { + super(message); + } + + public AMQPException(String msg, Throwable t) + { + super(msg, t); + } + + public AMQPException(int errorCode, String msg, Throwable t) + { + super(msg + " [error code " + errorCode + ']', t); + _errorCode = errorCode; + } + + public AMQPException(int errorCode, String msg) + { + super(msg + " [error code " + errorCode + ']'); + _errorCode = errorCode; + } + + public AMQPException(Logger logger, String msg, Throwable t) + { + this(msg, t); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, String msg) + { + this(msg); + logger.error(getMessage(), this); + } + + public AMQPException(Logger logger, int errorCode, String msg) + { + this(errorCode, msg); + logger.error(getMessage(), this); + } + + public int getErrorCode() + { + return _errorCode; + } + } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java new file mode 100644 index 0000000000..bf6a19b920 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +public abstract class AbstractPhase implements Phase { + + protected PhaseContext _ctx; + protected Phase _nextInFlowPhase; + protected Phase _nextOutFlowPhase; + + + /** + * ------------------------------------------------ + * Phase - method introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) { + _nextInFlowPhase = nextInFlowPhase; + _nextOutFlowPhase = nextOutFlowPhase; + _ctx = ctx; + } + + /** + * The start is called from the top + * of the pipe and is propogated the + * bottom. + * + * Each phase can override this to do + * any phase specific logic related + * pipe.start() + */ + public void start()throws AMQPException + { + if(_nextOutFlowPhase != null) + { + _nextOutFlowPhase.start(); + } + } + + /** + * Each phase can override this to do + * any phase specific cleanup + */ + public void close()throws AMQPException + { + + } + + public void messageReceived(Object frame) throws AMQPException + { + if(_nextInFlowPhase != null) + { + _nextInFlowPhase.messageReceived(frame); + } + } + + public void messageSent(Object frame) throws AMQPException + { + if (_nextOutFlowPhase != null) + { + _nextOutFlowPhase.messageSent(frame); + } + } + + public PhaseContext getPhaseContext() + { + return _ctx; + } + + public Phase getNextInFlowPhase() { + return _nextInFlowPhase; + } + + public Phase getNextOutFlowPhase() { + return _nextOutFlowPhase; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java new file mode 100644 index 0000000000..a3455cdacd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java @@ -0,0 +1,20 @@ +package org.apache.qpid.nclient.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultPhaseContext implements PhaseContext +{ + public Map<String,Object> _props = new ConcurrentHashMap<String,Object>(); + + public Object getProperty(String name) + { + return _props.get(name); + } + + public void setProperty(String name, Object value) + { + _props.put(name, value); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java new file mode 100644 index 0000000000..7aa10b77ff --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java @@ -0,0 +1,38 @@ +package org.apache.qpid.nclient.core; + + +public interface Phase +{ + + /** + * This method is used to initialize a phase + * + * @param ctx + * @param nextInFlowPhase + * @param nextOutFlowPhase + */ + public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase); + + /** + * + * Implement logic related to physical opening + * of the pipe + */ + public void start()throws AMQPException; + + /** + * Implement cleanup in this method. + * This indicates the pipe is closing + */ + public void close()throws AMQPException; + + public void messageReceived(Object msg) throws AMQPException; + + public void messageSent(Object msg) throws AMQPException; + + public PhaseContext getPhaseContext(); + + public Phase getNextOutFlowPhase(); + + public Phase getNextInFlowPhase(); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java new file mode 100644 index 0000000000..d5942fd785 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.core; + +/** + * This can be thought of as a session context associated + * with the pipe. This is transient and is scoped by the + * duration of the physical connection. + * + */ +public interface PhaseContext { + + public Object getProperty(String name); + + public void setProperty(String name, Object value); + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java new file mode 100644 index 0000000000..9542aab344 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java @@ -0,0 +1,63 @@ +package org.apache.qpid.nclient.core; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.nclient.config.ClientConfiguration; + +public class PhaseFactory +{ + /** + * This method will create the pipe and return a reference + * to the top of the pipeline. + * + * The application can then use this (top most) phase and all + * calls will propogated down the pipe. + * + * Simillar calls orginating at the bottom of the pipeline + * will be propogated to the top. + * + * @param ctx + * @return + * @throws AMQPException + */ + public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException + { + String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE; + Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>(); + List<String> list = ClientConfiguration.get().getList(key); + int index = 0; + for(String s:list) + { + try + { + Phase temp = (Phase)Class.forName(s).newInstance(); + phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ; + } + catch(Exception e) + { + throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e); + } + index++; + } + + Phase current = null; + Phase prev = null; + Phase next = null; + //Lets build the phase pipe. + for (int i=0; i<phaseMap.size();i++) + { + current = phaseMap.get(i); + if (i+1 < phaseMap.size()) + { + next = phaseMap.get(i+1); + } + current.init(ctx, next, prev); + prev = current; + next = null; + } + + return current; + } +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java new file mode 100644 index 0000000000..034fc28070 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java @@ -0,0 +1,67 @@ +package org.apache.qpid.nclient.core; + +public interface QpidConstants +{ + + // Common properties + public static long EMPTY_CORRELATION_ID = -1; + + public static int CHANNEL_ZERO = 0; + + public static String CONFIG_FILE_PATH = "ConfigFilePath"; + + // Phase Context properties + public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS"; + + public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR"; + + public final static String EVENT_MANAGER = "EVENT_MANAGER"; + + /**--------------------------------------------------------------- + * Configuration file properties + * ------------------------------------------------------------ + */ + + // Model Layer properties + public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds"; + public final static String AMQP_CLASS_FACTORY = "amqpClassFactory"; + + // MINA properties + public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool"; + + public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers"; + + public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator"; + + public final static String TCP_NO_DELAY = "tcpNoDelay"; + + public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb"; + + public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb"; + + // Security properties + public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes"; + + public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory"; + + public final static String TYPE = "[@type]"; + + public final static String AMQP_SECURITY = "security"; + + public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms"; + + public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler"; + + // Execution Layer properties + public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses"; + + //Transport Layer properties + public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass"; + + //Phase pipe properties + public final static String PHASE_PIPE = "phasePipe"; + + public final static String PHASE = "phase"; + + public final static String INDEX = "[@index]"; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java new file mode 100644 index 0000000000..cc1503d414 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java @@ -0,0 +1,19 @@ +package org.apache.qpid.nclient.core; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class TransientPhaseContext implements PhaseContext { + + private Map map = new ConcurrentHashMap(); + + public Object getProperty(String name) { + return map.get(name); + } + + public void setProperty(String name, Object value) { + map.put(name, value); + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java new file mode 100644 index 0000000000..1305500439 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java @@ -0,0 +1,188 @@ +package org.apache.qpid.nclient.execution; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * Corressponds to the Layer 2 in AMQP. + * This phase handles the correlation of amqp messages + * This class implements the 0.9 spec (request/response) + */ +public class ExecutionPhase extends AbstractPhase +{ + + protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class); + + protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap(); + + protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap(); + + /** + * -------------------------------------------------- + * Phase related methods + * -------------------------------------------------- + */ + + // should add these in the init method + //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false)); + //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false)); + public void messageReceived(Object msg) throws AMQPException + { + AMQFrame frame = (AMQFrame) msg; + final AMQBody bodyFrame = frame.getBodyFrame(); + + if (bodyFrame instanceof AMQRequestBody) + { + AMQPMethodEvent event; + try + { + event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame); + super.messageReceived(event); + } + catch (Exception e) + { + _logger.error("Error handling request", e); + } + + } + else if (bodyFrame instanceof AMQResponseBody) + { + List<AMQPMethodEvent> events; + try + { + events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame); + for (AMQPMethodEvent event : events) + { + super.messageReceived(event); + } + } + catch (Exception e) + { + _logger.error("Error handling response", e); + } + } + } + + /** + * Need to figure out if the message is a request or a response + * that needs to be sent and then delegate it to the Request or response manager + * to prepare it. + */ + public void messageSent(Object msg) throws AMQPException + { + AMQPMethodEvent evt = (AMQPMethodEvent) msg; + if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID) + { + // This is a request + AMQFrame frame = handleRequest(evt); + super.messageSent(frame); + } + else + { + // This is a response + List<AMQFrame> frames = handleResponse(evt); + for (AMQFrame frame : frames) + { + super.messageSent(frame); + } + } + } + + /** + * ------------------------------------------------ + * Methods to handle request response + * ----------------------------------------------- + */ + private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Request frame received: " + requestBody); + } + + ResponseManager responseManager; + if(_channelId2ResponseMgrMap.containsKey(channelId)) + { + responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); + } + else + { + responseManager = new ResponseManager(0,channelId,false); + _channelId2ResponseMgrMap.put(channelId, responseManager); + } + return responseManager.requestReceived(requestBody); + } + + private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) + throws Exception + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Response frame received: " + responseBody); + } + + RequestManager requestManager; + if (_channelId2RequestMgrMap.containsKey(channelId)) + { + requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); + } + else + { + requestManager = new RequestManager(0,channelId,false); + _channelId2RequestMgrMap.put(channelId, requestManager); + } + + return requestManager.responseReceived(responseBody); + } + + private AMQFrame handleRequest(AMQPMethodEvent evt) + { + int channelId = evt.getChannelId(); + RequestManager requestManager; + if (_channelId2RequestMgrMap.containsKey(channelId)) + { + requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId); + } + else + { + requestManager = new RequestManager(0,channelId,false); + _channelId2RequestMgrMap.put(channelId, requestManager); + } + return requestManager.sendRequest(evt); + } + + private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException + { + int channelId = evt.getChannelId(); + ResponseManager responseManager; + if(_channelId2ResponseMgrMap.containsKey(channelId)) + { + responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId); + } + else + { + responseManager = new ResponseManager(0,channelId,false); + _channelId2ResponseMgrMap.put(channelId, responseManager); + } + try + { + return responseManager.sendResponse(evt); + } + catch (Exception e) + { + throw new AMQPException("Error handling response", e); + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java new file mode 100644 index 0000000000..0084f27717 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; + +public class RequestManager +{ + private static final Logger logger = Logger.getLogger(RequestManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastProcessedResponseId; + + private ConcurrentHashMap<Long, CorrelationID> requestSentMap; + + public RequestManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + requestIdCount = 1L; + lastProcessedResponseId = 0L; + requestSentMap = new ConcurrentHashMap<Long, CorrelationID>(); + } + + // *** Functions to originate a request *** + + public AMQFrame sendRequest(AMQPMethodEvent evt) + { + long requestId = getNextRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastProcessedResponseId, evt.getMethod()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod()); + } + requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId())); + return requestFrame; + } + + public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody) + throws Exception + { + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " + + responseBody + "; " + responseBody.getMethodPayload()); + } + + List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + if (requestSentMap.get(requestId) == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in requestSentMap."); + } + CorrelationID correlationID = requestSentMap.get(requestId); + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(), + correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID()); + events.add(methodEvent); + requestSentMap.remove(requestId); + } + lastProcessedResponseId = responseBody.getResponseId(); + return events; + } + + // *** Management functions *** + + public int requestsMapSize() + { + return requestSentMap.size(); + } + + // *** Private helper functions *** + + private long getNextRequestId() + { + return requestIdCount++; + } + + private class CorrelationID + { + // Use for the request/response stuff + private long _systemCorrelationID; + // used internally to track callbacks + private long _localCorrelationID; + + CorrelationID(long systemCorrelationID,long localCorrelationID) + { + _localCorrelationID = localCorrelationID; + _systemCorrelationID = systemCorrelationID; + } + + public long getLocalCorrelationID() + { + return _localCorrelationID; + } + + public void setLocalCorrelationID(long correlationID) + { + _localCorrelationID = correlationID; + } + + public long getSystemCorrelationID() + { + return _systemCorrelationID; + } + + public void setSystemCorrelationID(long correlationID) + { + _systemCorrelationID = correlationID; + } + + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java new file mode 100644 index 0000000000..c5a75d242f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java @@ -0,0 +1,240 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.execution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQRequestBody; +import org.apache.qpid.framing.AMQResponseBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class ResponseManager +{ + private static final Logger logger = Logger.getLogger(ResponseManager.class); + + private int channel; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; + private long connectionId; + + private int maxAccumulatedResponses = 20; // Default + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long responseIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedRequestId; + + /** + * Last requestID sent in a response (for batching) + */ + private long lastSentRequestId; + + private class ResponseStatus implements Comparable<ResponseStatus> + { + private long requestId; + private AMQMethodBody responseMethodBody; + + public ResponseStatus(long requestId) + { + this.requestId = requestId; + responseMethodBody = null; + } + + public int compareTo(ResponseStatus o) + { + return (int)(requestId - o.requestId); + } + + public String toString() + { + // Need to define this + return ""; + } + } + + private ConcurrentHashMap<Long, ResponseStatus> responseMap; + + public ResponseManager(long connectionId, int channel, boolean serverFlag) + { + this.channel = channel; + this.serverFlag = serverFlag; + this.connectionId = connectionId; + responseIdCount = 1L; + lastReceivedRequestId = 0L; + maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES); + responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); + } + + // *** Functions to handle an incoming request *** + + public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception + { + long requestId = requestBody.getRequestId(); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " + + requestBody + "; " + requestBody.getMethodPayload()); + } + long responseMark = requestBody.getResponseMark(); + lastReceivedRequestId = requestId; + responseMap.put(requestId, new ResponseStatus(requestId)); + + AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, + requestBody.getMethodPayload(), requestId); + + return methodEvent; + } + + public List<AMQFrame> sendResponse(AMQPMethodEvent evt) + throws RequestResponseMappingException + { + long requestId = evt.getCorrelationId(); + AMQMethodBody responseMethodBody = evt.getMethod(); + + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + + "] TX RES: Res[# " + requestId + "]; " + responseMethodBody); + } + + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus == null) + { + throw new RequestResponseMappingException(requestId, + "Failed to locate requestId " + requestId + " in responseMap." + responseMap); + } + if (responseStatus.responseMethodBody != null) + { + throw new RequestResponseMappingException(requestId, "RequestId " + + requestId + " already has a response in responseMap."); + } + responseStatus.responseMethodBody = responseMethodBody; + return doBatches(); + } + + // *** Management functions *** + + /** + * Sends batched responses - i.e. all those members of responseMap that have + * received a response. + */ + public synchronized List<AMQFrame> doBatches() + { + long startRequestId = 0; + int numAdditionalRequestIds = 0; + Class responseMethodBodyClass = null; + List<AMQFrame> frames = new ArrayList<AMQFrame>(); + Iterator<Long> lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) + { + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { + frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody)); + lItr.remove(); + } + } + + return frames; + } + + /** + * Total number of entries in the responseMap - including both those that + * are outstanding (i.e. no response has been received) and those that are + * batched (those for which responses have been received but have not yet + * been collected together and sent). + */ + public int responsesMapSize() + { + return responseMap.size(); + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody null. + */ + public int outstandingResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody == null) + cnt++; + } + return cnt; + } + + /** + * As the responseMap may contain both outstanding responses (those with + * ResponseStatus.responseMethodBody still null) and responses waiting to + * be batched (those with ResponseStatus.responseMethodBody not null), we + * need to count only those in the map with responseMethodBody not null. + */ + public int batchedResponses() + { + int cnt = 0; + for (Long requestId : responseMap.keySet()) + { + if (responseMap.get(requestId).responseMethodBody != null) + cnt++; + } + return cnt; + } + + // *** Private helper functions *** + + private long getNextResponseId() + { + return responseIdCount++; + } + + private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests, + AMQMethodBody responseMethodBody) + { + long responseId = getNextResponseId(); // Get new response ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); + return responseFrame; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java new file mode 100644 index 0000000000..d79525c5b2 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.nclient.message; + +import java.util.LinkedList; +import java.util.List; + +public class AMQPApplicationMessage { + + private int bytesReceived = 0; + private int channelId; + private byte[] referenceId; + private List<byte[]> contents = new LinkedList<byte[]>(); + private long deliveryTag; + private boolean redeliveredFlag; + private MessageHeaders messageHeaders; + + public AMQPApplicationMessage(int channelId, byte[] referenceId) + { + this.channelId = channelId; + this.referenceId = referenceId; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + } + + public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag) + { + this.channelId = channelId; + this.deliveryTag = deliveryTag; + this.messageHeaders = messageHeaders; + this.redeliveredFlag = redeliveredFlag; + addContent(content); + } + + public void addContent(byte[] content) + { + contents.add(content); + bytesReceived += content.length; + } + + public int getBytesReceived() + { + return bytesReceived; + } + + public int getChannelId() + { + return channelId; + } + + public byte[] getReferenceId() + { + return referenceId; + } + + public List<byte[]> getContents() + { + return contents; + } + + public long getDeliveryTag() + { + return deliveryTag; + } + + public boolean getRedeliveredFlag() + { + return redeliveredFlag; + } + + public MessageHeaders getMessageHeaders() + { + return messageHeaders; + } + + public String toString() + { + return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + + new String(contents.get(0)); + } + + public void setDeliveryTag(long deliveryTag) + { + this.deliveryTag = deliveryTag; + } + + public void setMessageHeaders(MessageHeaders messageHeaders) + { + this.messageHeaders = messageHeaders; + } + + public void setRedeliveredFlag(boolean redeliveredFlag) + { + this.redeliveredFlag = redeliveredFlag; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java new file mode 100644 index 0000000000..562aa7b06e --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java @@ -0,0 +1,679 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.qpid.nclient.message; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import java.util.Enumeration; + +public class MessageHeaders +{ + private static final Logger _logger = Logger.getLogger(MessageHeaders.class); + + private AMQShortString _contentType; + + private AMQShortString _encoding; + + private AMQShortString _destination; + + private AMQShortString _exchange; + + private FieldTable _jmsHeaders; + + private short _deliveryMode; + + private short _priority; + + private AMQShortString _correlationId; + + private AMQShortString _replyTo; + + private long _expiration; + + private AMQShortString _messageId; + + private long _timestamp; + + private AMQShortString _type; + + private AMQShortString _userId; + + private AMQShortString _appId; + + private AMQShortString _transactionId; + + private AMQShortString _routingKey; + + private int _size; + + public int getSize() + { + return _size; + } + + public void setSize(int size) + { + this._size = size; + } + + public MessageHeaders() + { + } + + public AMQShortString getContentType() + { + return _contentType; + } + + public void setContentType(AMQShortString contentType) + { + _contentType = contentType; + } + + public AMQShortString getEncoding() + { + return _encoding; + } + + public void setEncoding(AMQShortString encoding) + { + _encoding = encoding; + } + + public FieldTable getJMSHeaders() + { + if (_jmsHeaders == null) + { + setJMSHeaders(FieldTableFactory.newFieldTable()); + } + + return _jmsHeaders; + } + + public void setJMSHeaders(FieldTable headers) + { + _jmsHeaders = headers; + } + + + public short getDeliveryMode() + { + return _deliveryMode; + } + + public void setDeliveryMode(short deliveryMode) + { + _deliveryMode = deliveryMode; + } + + public short getPriority() + { + return _priority; + } + + public void setPriority(short priority) + { + _priority = priority; + } + + public AMQShortString getCorrelationId() + { + return _correlationId; + } + + public void setCorrelationId(AMQShortString correlationId) + { + _correlationId = correlationId; + } + + public AMQShortString getReplyTo() + { + return _replyTo; + } + + public void setReplyTo(AMQShortString replyTo) + { + _replyTo = replyTo; + } + + public long getExpiration() + { + return _expiration; + } + + public void setExpiration(long expiration) + { + _expiration = expiration; + } + + + public AMQShortString getMessageId() + { + return _messageId; + } + + public void setMessageId(AMQShortString messageId) + { + _messageId = messageId; + } + + public long getTimestamp() + { + return _timestamp; + } + + public void setTimestamp(long timestamp) + { + _timestamp = timestamp; + } + + public AMQShortString getType() + { + return _type; + } + + public void setType(AMQShortString type) + { + _type = type; + } + + public AMQShortString getUserId() + { + return _userId; + } + + public void setUserId(AMQShortString userId) + { + _userId = userId; + } + + public AMQShortString getAppId() + { + return _appId; + } + + public void setAppId(AMQShortString appId) + { + _appId = appId; + } + + // MapMessage Interface + + public boolean getBoolean(AMQShortString string) throws JMSException + { + Boolean b = getJMSHeaders().getBoolean(string); + + if (b == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getBoolean can't use " + string + " item."); + } + else + { + return Boolean.valueOf(((AMQShortString)str).asString()); + } + } + else + { + b = Boolean.valueOf(null); + } + } + + return b; + } + + public char getCharacter(AMQShortString string) throws JMSException + { + Character c = getJMSHeaders().getCharacter(string); + + if (c == null) + { + if (getJMSHeaders().isNullStringValue(string.asString())) + { + throw new NullPointerException("Cannot convert null char"); + } + else + { + throw new MessageFormatException("getChar can't use " + string + " item."); + } + } + else + { + return (char) c; + } + } + + public byte[] getBytes(AMQShortString string) throws JMSException + { + byte[] bs = getJMSHeaders().getBytes(string); + + if (bs == null) + { + throw new MessageFormatException("getBytes can't use " + string + " item."); + } + else + { + return bs; + } + } + + public byte getByte(AMQShortString string) throws JMSException + { + Byte b = getJMSHeaders().getByte(string); + if (b == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getByte can't use " + string + " item."); + } + else + { + return Byte.valueOf(((AMQShortString)str).asString()); + } + } + else + { + b = Byte.valueOf(null); + } + } + + return b; + } + + public short getShort(AMQShortString string) throws JMSException + { + Short s = getJMSHeaders().getShort(string); + + if (s == null) + { + s = Short.valueOf(getByte(string)); + } + + return s; + } + + public int getInteger(AMQShortString string) throws JMSException + { + Integer i = getJMSHeaders().getInteger(string); + + if (i == null) + { + i = Integer.valueOf(getShort(string)); + } + + return i; + } + + public long getLong(AMQShortString string) throws JMSException + { + Long l = getJMSHeaders().getLong(string); + + if (l == null) + { + l = Long.valueOf(getInteger(string)); + } + + return l; + } + + public float getFloat(AMQShortString string) throws JMSException + { + Float f = getJMSHeaders().getFloat(string); + + if (f == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object str = getJMSHeaders().getObject(string); + + if (str == null || !(str instanceof AMQShortString)) + { + throw new MessageFormatException("getFloat can't use " + string + " item."); + } + else + { + return Float.valueOf(((AMQShortString)str).asString()); + } + } + else + { + f = Float.valueOf(null); + } + + } + + return f; + } + + public double getDouble(AMQShortString string) throws JMSException + { + Double d = getJMSHeaders().getDouble(string); + + if (d == null) + { + d = Double.valueOf(getFloat(string)); + } + + return d; + } + + public AMQShortString getString(AMQShortString string) throws JMSException + { + AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString())); + + if (s == null) + { + if (getJMSHeaders().containsKey(string)) + { + Object o = getJMSHeaders().getObject(string); + if (o instanceof byte[]) + { + throw new MessageFormatException("getObject couldn't find " + string + " item."); + } + else + { + if (o == null) + { + return null; + } + else + { + s = (AMQShortString) o; + } + } + } + } + + return s; + } + + public Object getObject(AMQShortString string) throws JMSException + { + return getJMSHeaders().getObject(string); + } + + public void setBoolean(AMQShortString string, boolean b) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setBoolean(string, b); + } + + public void setChar(AMQShortString string, char c) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setChar(string, c); + } + + public Object setBytes(AMQShortString string, byte[] bytes) + { + return getJMSHeaders().setBytes(string, bytes); + } + + public Object setBytes(AMQShortString string, byte[] bytes, int start, int length) + { + return getJMSHeaders().setBytes(string, bytes, start, length); + } + + public void setByte(AMQShortString string, byte b) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setByte(string, b); + } + + public void setShort(AMQShortString string, short i) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setShort(string, i); + } + + public void setInteger(AMQShortString string, int i) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setInteger(string, i); + } + + public void setLong(AMQShortString string, long l) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setLong(string, l); + } + + public void setFloat(AMQShortString string, float v) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setFloat(string, v); + } + + public void setDouble(AMQShortString string, double v) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setDouble(string, v); + } + + public void setString(AMQShortString string, AMQShortString string1) throws JMSException + { + checkPropertyName(string); + getJMSHeaders().setString(string.asString(), string1.asString()); + } + + public void setObject(AMQShortString string, Object object) throws JMSException + { + checkPropertyName(string); + try + { + getJMSHeaders().setObject(string, object); + } + catch (AMQPInvalidClassException aice) + { + throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass()); + } + } + + public boolean itemExists(AMQShortString string) throws JMSException + { + return getJMSHeaders().containsKey(string); + } + + public Enumeration getPropertyNames() + { + return getJMSHeaders().getPropertyNames(); + } + + public void clear() + { + getJMSHeaders().clear(); + } + + public boolean propertyExists(AMQShortString propertyName) + { + return getJMSHeaders().propertyExists(propertyName); + } + + public Object put(Object key, Object value) + { + return getJMSHeaders().setObject(key.toString(), value); + } + + public Object remove(AMQShortString propertyName) + { + return getJMSHeaders().remove(propertyName); + } + + public boolean isEmpty() + { + return getJMSHeaders().isEmpty(); + } + + public void writeToBuffer(ByteBuffer data) + { + getJMSHeaders().writeToBuffer(data); + } + + public Enumeration getMapNames() + { + return getPropertyNames(); + } + + protected static void checkPropertyName(CharSequence propertyName) + { + if (propertyName == null) + { + throw new IllegalArgumentException("Property name must not be null"); + } + else if (propertyName.length() == 0) + { + throw new IllegalArgumentException("Property name must not be the empty string"); + } + + checkIdentiferFormat(propertyName); + } + + protected static void checkIdentiferFormat(CharSequence propertyName) + { +// JMS requirements 3.5.1 Property Names +// Identifiers: +// - An identifier is an unlimited-length character sequence that must begin +// with a Java identifier start character; all following characters must be Java +// identifier part characters. An identifier start character is any character for +// which the method Character.isJavaIdentifierStart returns true. This includes +// '_' and '$'. An identifier part character is any character for which the +// method Character.isJavaIdentifierPart returns true. +// - Identifiers cannot be the names NULL, TRUE, or FALSE. +// – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or +// ESCAPE. +// – Identifiers are either header field references or property references. The +// type of a property value in a message selector corresponds to the type +// used to set the property. If a property that does not exist in a message is +// referenced, its value is NULL. The semantics of evaluating NULL values +// in a selector are described in Section 3.8.1.2, “Null Values.” +// – The conversions that apply to the get methods for properties do not +// apply when a property is used in a message selector expression. For +// example, suppose you set a property as a string value, as in the +// following: +// myMessage.setStringProperty("NumberOfOrders", "2"); +// The following expression in a message selector would evaluate to false, +// because a string cannot be used in an arithmetic expression: +// "NumberOfOrders > 1" +// – Identifiers are case sensitive. +// – Message header field references are restricted to JMSDeliveryMode, +// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and +// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be +// null and if so are treated as a NULL value. + + if (Boolean.getBoolean("strict-jms")) + { + // JMS start character + if (!(Character.isJavaIdentifierStart(propertyName.charAt(0)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character"); + } + + // JMS part character + int length = propertyName.length(); + for (int c = 1; c < length; c++) + { + if (!(Character.isJavaIdentifierPart(propertyName.charAt(c)))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character"); + } + } + + + + + // JMS invalid names + if ((propertyName.equals("NULL") + || propertyName.equals("TRUE") + || propertyName.equals("FALSE") + || propertyName.equals("NOT") + || propertyName.equals("AND") + || propertyName.equals("OR") + || propertyName.equals("BETWEEN") + || propertyName.equals("LIKE") + || propertyName.equals("IN") + || propertyName.equals("IS") + || propertyName.equals("ESCAPE"))) + { + throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS"); + } + } + + } + + public AMQShortString getTransactionId() + { + return _transactionId; + } + + public void setTransactionId(AMQShortString id) + { + _transactionId = id; + } + + public AMQShortString getDestination() + { + return _destination; + } + + public void setDestination(AMQShortString destination) + { + this._destination = destination; + } + + public AMQShortString getExchange() + { + return _exchange; + } + + public void setExchange(AMQShortString exchange) + { + this._exchange = exchange; + } + + public AMQShortString getRoutingKey() + { + return _routingKey; + } + + public void setRoutingKey(AMQShortString routingKey) + { + this._routingKey = routingKey; + } +} + + diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java new file mode 100644 index 0000000000..53a2142718 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.message; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.model.ModelPhase; + +public class MessagePhase extends AbstractPhase { + + private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>(); + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + public void messageReceived(Object msg) throws AMQPException + { + try + { + _queue.put((AMQPApplicationMessage)msg); + } + catch (InterruptedException e) + { + _logger.error("Error adding message to queue", e); + } + super.messageReceived(msg); + } + + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + public AMQPApplicationMessage getNextMessage() + { + return _queue.poll(); + } + + public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException + { + return _queue.poll(timeout, tu); + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java new file mode 100644 index 0000000000..efd7264f96 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java @@ -0,0 +1,16 @@ +package org.apache.qpid.nclient.message; + +import org.apache.qpid.AMQException; + +public interface MessageStore { + + public void removeMessage(String identifier); + + public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException; + + public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException; + + public AMQPApplicationMessage getMessage(String identifier) throws AMQException; + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException; +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java new file mode 100644 index 0000000000..eb5a9c1778 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java @@ -0,0 +1,39 @@ +package org.apache.qpid.nclient.message; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.AMQException; + +public class TransientMessageStore implements MessageStore { + + private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>(); + + public AMQPApplicationMessage getMessage(String identifier) + throws AMQException + { + return messageMap.get(identifier); + } + + public void removeMessage(String identifier) + { + messageMap.remove(identifier); + } + + public void storeContentBodyChunk(String identifier, byte[] contentBody) + throws AMQException + { + + } + + public void storeMessageMetaData(String identifier, + MessageHeaders messageHeaders) throws AMQException + { + + } + + public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException + { + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java new file mode 100644 index 0000000000..d845059ee7 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java @@ -0,0 +1,71 @@ +package org.apache.qpid.nclient.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.amqp.event.AMQPEventManager; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.QpidConstants; + +/** + * This Phase handles Layer 3 functionality of the AMQP spec. + * This class acts as the interface between the API and the pipeline + */ +public class ModelPhase extends AbstractPhase { + + private static final Logger _logger = Logger.getLogger(ModelPhase.class); + + private Map <Class,List> _methodListners = new HashMap<Class,List>(); + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) + { + super.init(ctx, nextInFlowPhase, nextOutFlowPhase); + } + + public void messageReceived(Object msg) throws AMQPException + { + notifyMethodListerners((AMQPMethodEvent)msg); + + // not doing super.methodReceived here, as this is the end of + // the pipeline + //super.messageReceived(msg); + } + + /** + * This method should only except and pass messages + * of Type @see AMQPMethodEvent + */ + public void messageSent(Object msg) throws AMQPException + { + super.messageSent(msg); + } + + /** + * ------------------------------------------------ + * Event Handling + * ------------------------------------------------ + */ + + public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException + { + AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER); + eventManager.notifyEvent(event); + } + + /** + * ------------------------------------------------ + * Configuration + * ------------------------------------------------ + */ +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java new file mode 100644 index 0000000000..fc5878f6ef --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.auth.callback.CallbackHandler; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public interface AMQPCallbackHandler extends CallbackHandler +{ + void initialise(ConnectionURL url); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java new file mode 100644 index 0000000000..428cd6753d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class CallbackHandlerRegistry +{ + private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class); + + private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry(); + + private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>(); + + private String _mechanisms; + + public static CallbackHandlerRegistry getInstance() + { + return _instance; + } + + public Class getCallbackHandlerClass(String mechanism) + { + return _mechanismToHandlerClassMap.get(mechanism); + } + + public String getMechanisms() + { + return _mechanisms; + } + + private CallbackHandlerRegistry() + { + // first we register any Sasl client factories + DynamicSaslRegistrar.registerSaslProviders(); + parseProperties(); + } + + private void parseProperties() + { + String key = QpidConstants.AMQP_SECURITY + "." + + QpidConstants.AMQP_SECURITY_MECHANISMS + "." + + QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER; + + int index = ClientConfiguration.get().getMaxIndex(key); + + for (int i=0; i<index+1;i++) + { + String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]"); + String className = ClientConfiguration.get().getString(key + "(" + i + ")" ); + Class clazz = null; + try + { + clazz = Class.forName(className); + if (!AMQPCallbackHandler.class.isAssignableFrom(clazz)) + { + _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class + + ". Skipping"); + continue; + } + _mechanismToHandlerClassMap.put(mechanism, clazz); + if (_mechanisms == null) + { + _mechanisms = mechanism; + } + else + { + // one time cost + _mechanisms = _mechanisms + " " + mechanism; + } + } + catch (ClassNotFoundException ex) + { + _logger.warn("Unable to load class " + className + ". Skipping that SASL provider"); + continue; + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java new file mode 100644 index 0000000000..958c6c4782 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.security.Security; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.security.sasl.SaslClientFactory; + +import org.apache.log4j.Logger; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.QpidConstants; + +public class DynamicSaslRegistrar +{ + private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class); + + public static void registerSaslProviders() + { + Map<String, Class<? extends SaslClientFactory>> factories = parseProperties(); + if (factories.size() > 0) + { + Security.addProvider(new JCAProvider(factories)); + _logger.debug("Dynamic SASL provider added as a security provider"); + } + } + + private static Map<String, Class<? extends SaslClientFactory>> parseProperties() + { + List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES); + TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister = + new TreeMap<String, Class<? extends SaslClientFactory>>(); + for (String mechanism: mechanisms) + { + String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism); + try + { + Class<?> clazz = Class.forName(className); + if (!(SaslClientFactory.class.isAssignableFrom(clazz))) + { + _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping"); + continue; + } + factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz); + } + catch (Exception ex) + { + _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping"); + } + } + return factoriesToRegister; + } + + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java new file mode 100644 index 0000000000..10ccb88821 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import javax.security.sasl.SaslClientFactory; +import java.security.Provider; +import java.security.Security; +import java.util.Map; + +public class JCAProvider extends Provider +{ + public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap) + { + super("AMQSASLProvider", 1.0, "A JCA provider that registers all " + + "AMQ SASL providers that want to be registered"); + register(providerMap); + Security.addProvider(this); + } + + private void register(Map<String, Class<? extends SaslClientFactory>> providerMap) + { + for (Map.Entry<String, Class<? extends SaslClientFactory>> me : + providerMap.entrySet()) + { + put("SaslClientFactory." + me.getKey(), me.getValue().getName()); + } + } +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java new file mode 100644 index 0000000000..7297d07134 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security; + +import java.io.IOException; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; + +import org.apache.qpid.nclient.transport.ConnectionURL; + +public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler +{ + private ConnectionURL _url; + + public void initialise(ConnectionURL url) + { + _url = url; + } + + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException + { + for (int i = 0; i < callbacks.length; i++) + { + Callback cb = callbacks[i]; + if (cb instanceof NameCallback) + { + ((NameCallback)cb).setName(_url.getUsername()); + } + else if (cb instanceof PasswordCallback) + { + ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray()); + } + else + { + throw new UnsupportedCallbackException(cb); + } + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java new file mode 100644 index 0000000000..1097346c1d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security.amqplain; + +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.FieldTableFactory; + +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.Callback; + +/** + * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd. + * + */ +public class AmqPlainSaslClient implements SaslClient +{ + /** + * The name of this mechanism + */ + public static final String MECHANISM = "AMQPLAIN"; + + private CallbackHandler _cbh; + + public AmqPlainSaslClient(CallbackHandler cbh) + { + _cbh = cbh; + } + + public String getMechanismName() + { + return "AMQPLAIN"; + } + + public boolean hasInitialResponse() + { + return true; + } + + public byte[] evaluateChallenge(byte[] challenge) throws SaslException + { + // we do not care about the prompt or the default name + NameCallback nameCallback = new NameCallback("prompt", "defaultName"); + PasswordCallback pwdCallback = new PasswordCallback("prompt", false); + Callback[] callbacks = new Callback[]{nameCallback, pwdCallback}; + try + { + _cbh.handle(callbacks); + } + catch (Exception e) + { + throw new SaslException("Error handling SASL callbacks: " + e, e); + } + FieldTable table = FieldTableFactory.newFieldTable(); + table.setString("LOGIN", nameCallback.getName()); + table.setString("PASSWORD", new String(pwdCallback.getPassword())); + return table.getDataAsBytes(); + } + + public boolean isComplete() + { + return true; + } + + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException + { + throw new SaslException("Not supported"); + } + + public Object getNegotiatedProperty(String propName) + { + return null; + } + + public void dispose() throws SaslException + { + _cbh = null; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java new file mode 100644 index 0000000000..f98c1e3a58 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.security.amqplain; + +import javax.security.sasl.SaslClientFactory; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import javax.security.sasl.Sasl; +import javax.security.auth.callback.CallbackHandler; +import java.util.Map; + +public class AmqPlainSaslClientFactory implements SaslClientFactory +{ + public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException + { + for (int i = 0; i < mechanisms.length; i++) + { + if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM)) + { + if (cbh == null) + { + throw new SaslException("CallbackHandler must not be null"); + } + return new AmqPlainSaslClient(cbh); + } + } + return null; + } + + public String[] getMechanismNames(Map props) + { + if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) || + props.containsKey(Sasl.POLICY_NODICTIONARY) || + props.containsKey(Sasl.POLICY_NOACTIVE)) + { + // returned array must be non null according to interface documentation + return new String[0]; + } + else + { + return new String[]{AmqPlainSaslClient.MECHANISM}; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java new file mode 100644 index 0000000000..9e878fb839 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java @@ -0,0 +1,344 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.HashMap; +import java.net.URISyntaxException; +import java.net.URI; + +public class AMQPBrokerDetails implements BrokerDetails +{ + private String _host; + + private int _port; + + private String _transport; + + private HashMap<String, String> _options; + + public AMQPBrokerDetails() + { + _options = new HashMap<String, String>(); + } + + public AMQPBrokerDetails(String url) throws URLSyntaxException + { + this(); + // URL should be of format tcp://host:port?option='value',option='value' + try + { + URI connection = new URI(url); + + String transport = connection.getScheme(); + + // Handles some defaults to minimise changes to existing broker URLS e.g. localhost + if (transport != null) + { + //todo this list of valid transports should be enumerated somewhere + if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp")))) + { + if (transport.equalsIgnoreCase("localhost")) + { + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/') + { + //Then most likely we have a host:port value + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + else + { + URLHelper.parseError(0, transport.length(), "Unknown transport", url); + } + } + } + } + else + { + //Default the transport + connection = new URI(DEFAULT_TRANSPORT + "://" + url); + transport = connection.getScheme(); + } + + if (transport == null) + { + URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url + + "' Format: " + URL_FORMAT_EXAMPLE, ""); + } + + setTransport(transport); + + String host = connection.getHost(); + + // Fix for Java 1.5 + if (host == null) + { + host = ""; + } + + setHost(host); + + int port = connection.getPort(); + + if (port == -1) + { + // Fix for when there is port data but it is not automatically parseable by getPort(). + String auth = connection.getAuthority(); + + if (auth != null && auth.contains(":")) + { + int start = auth.indexOf(":") + 1; + int end = start; + boolean looking = true; + boolean found = false; + //Walk the authority looking for a port value. + while (looking) + { + try + { + end++; + Integer.parseInt(auth.substring(start, end)); + + if (end >= auth.length()) + { + looking = false; + found = true; + } + } + catch (NumberFormatException nfe) + { + looking = false; + } + + } + if (found) + { + setPort(Integer.parseInt(auth.substring(start, end))); + } + else + { + URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1, + "Illegal character in port number", connection.toString()); + } + + } + else + { + setPort(DEFAULT_PORT); + } + } + else + { + setPort(port); + } + + String queryString = connection.getQuery(); + + URLHelper.parseOptions(_options, queryString); + + //Fragment is #string (not used) + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + } + + public AMQPBrokerDetails(String host, int port, boolean useSSL) + { + _host = host; + _port = port; + + if (useSSL) + { + setOption(OPTIONS_SSL, "true"); + } + } + + public String getHost() + { + return _host; + } + + public void setHost(String _host) + { + this._host = _host; + } + + public int getPort() + { + return _port; + } + + public void setPort(int _port) + { + this._port = _port; + } + + public String getTransport() + { + return _transport; + } + + public void setTransport(String _transport) + { + this._transport = _transport; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public long getTimeout() + { + if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT)) + { + try + { + return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT)); + } + catch (NumberFormatException nfe) + { + //Do nothing as we will use the default below. + } + } + + return BrokerDetails.DEFAULT_CONNECT_TIMEOUT; + } + + public void setTimeout(long timeout) + { + setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout)); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(_transport); + sb.append("://"); + + if (!(_transport.equalsIgnoreCase("vm"))) + { + sb.append(_host); + } + + sb.append(':'); + sb.append(_port); + + sb.append(printOptionsURL()); + + return sb.toString(); + } + + public boolean equals(Object o) + { + if (!(o instanceof BrokerDetails)) + { + return false; + } + + BrokerDetails bd = (BrokerDetails) o; + + return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort()) + && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL()); + + //todo do we need to compare all the options as well? + } + + private String printOptionsURL() + { + StringBuffer optionsURL = new StringBuffer(); + + optionsURL.append('?'); + + if (!(_options.isEmpty())) + { + + for (String key : _options.keySet()) + { + optionsURL.append(key); + + optionsURL.append("='"); + + optionsURL.append(_options.get(key)); + + optionsURL.append("'"); + + optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + } + } + + //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options + optionsURL.deleteCharAt(optionsURL.length() - 1); + + return optionsURL.toString(); + } + + public boolean useSSL() + { + // To be friendly to users we should be case insensitive. + // or simply force users to conform to OPTIONS_SSL + // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL + + if (_options.containsKey(OPTIONS_SSL)) + { + return _options.get(OPTIONS_SSL).equalsIgnoreCase("true"); + } + + return USE_SSL_DEFAULT; + } + + public void useSSL(boolean ssl) + { + setOption(OPTIONS_SSL, Boolean.toString(ssl)); + } + + public static String checkTransport(String broker) + { + if ((!broker.contains("://"))) + { + return "tcp://" + broker; + } + else + { + return broker; + } + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java new file mode 100644 index 0000000000..d0259830c6 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPConnectionURL.java @@ -0,0 +1,412 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + +import java.util.*; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +public class AMQPConnectionURL implements ConnectionURL +{ + private String _url; + private String _failoverMethod; + private HashMap<String, String> _failoverOptions; + private HashMap<String, String> _options; + private List<BrokerDetails> _brokers; + private String _clientName; + private String _username; + private String _password; + private String _virtualHost; + + public AMQPConnectionURL(String fullURL) throws URLSyntaxException + { + _url = fullURL; + _options = new HashMap<String, String>(); + _brokers = new LinkedList<BrokerDetails>(); + _failoverOptions = new HashMap<String, String>(); + + try + { + URI connection = new URI(fullURL); + + if (connection.getScheme() == null || !(connection.getScheme().equalsIgnoreCase(AMQ_PROTOCOL))) + { + throw new URISyntaxException(fullURL, "Not an AMQP URL"); + } + + if (connection.getHost() == null || connection.getHost().equals("")) + { + String uid = getUniqueClientID(); + if (uid == null) + { + URLHelper.parseError(-1, "Client Name not specified", fullURL); + } + else + { + setClientName(uid); + } + + } + else + { + setClientName(connection.getHost()); + } + + String userInfo = connection.getUserInfo(); + + if (userInfo == null) + { + //Fix for Java 1.5 which doesn't parse UserInfo for non http URIs + userInfo = connection.getAuthority(); + + if (userInfo != null) + { + int atIndex = userInfo.indexOf('@'); + + if (atIndex != -1) + { + userInfo = userInfo.substring(0, atIndex); + } + else + { + userInfo = null; + } + } + + } + + if (userInfo == null) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, + "User information not found on url", fullURL); + } + else + { + parseUserInfo(userInfo); + } + String virtualHost = connection.getPath(); + + if (virtualHost != null && (!virtualHost.equals(""))) + { + setVirtualHost(virtualHost); + } + else + { + int authLength = connection.getAuthority().length(); + int start = AMQ_PROTOCOL.length() + 3; + int testIndex = start + authLength; + if (testIndex < fullURL.length() && fullURL.charAt(testIndex) == '?') + { + URLHelper.parseError(start, testIndex - start, "Virtual host found", fullURL); + } + else + { + URLHelper.parseError(-1, "Virtual host not specified", fullURL); + } + + } + + + URLHelper.parseOptions(_options, connection.getQuery()); + + processOptions(); + + //Fragment is #string (not used) + //System.out.println(connection.getFragment()); + + } + catch (URISyntaxException uris) + { + if (uris instanceof URLSyntaxException) + { + throw (URLSyntaxException) uris; + } + + int slash = fullURL.indexOf("\\"); + + if (slash == -1) + { + URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); + } + else + { + if (slash != 0 && fullURL.charAt(slash - 1) == ':') + { + URLHelper.parseError(slash - 2, fullURL.indexOf('?') - slash + 2, "Virtual host looks like a windows path, forward slash not allowed in URL", fullURL); + } + else + { + URLHelper.parseError(slash, "Forward slash not allowed in URL", fullURL); + } + } + + } + } + + private String getUniqueClientID() + { + try + { + InetAddress addr = InetAddress.getLocalHost(); + return addr.getHostName() + System.currentTimeMillis(); + } + catch (UnknownHostException e) + { + return null; + } + } + + private void parseUserInfo(String userinfo) throws URLSyntaxException + { + //user info = user:pass + + int colonIndex = userinfo.indexOf(':'); + + if (colonIndex == -1) + { + URLHelper.parseError(AMQ_PROTOCOL.length() + 3, userinfo.length(), + "Null password in user information not allowed.", _url); + } + else + { + setUsername(userinfo.substring(0, colonIndex)); + setPassword(userinfo.substring(colonIndex + 1)); + } + + } + + private void processOptions() throws URLSyntaxException + { + if (_options.containsKey(OPTIONS_BROKERLIST)) + { + String brokerlist = _options.get(OPTIONS_BROKERLIST); + + //brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value' + StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR); + + while (st.hasMoreTokens()) + { + String broker = st.nextToken(); + + _brokers.add(new AMQPBrokerDetails(broker)); + } + + _options.remove(OPTIONS_BROKERLIST); + } + + if (_options.containsKey(OPTIONS_FAILOVER)) + { + String failover = _options.get(OPTIONS_FAILOVER); + + // failover='method?option='value',option='value'' + + int methodIndex = failover.indexOf('?'); + + if (methodIndex > -1) + { + _failoverMethod = failover.substring(0, methodIndex); + URLHelper.parseOptions(_failoverOptions, failover.substring(methodIndex + 1)); + } + else + { + _failoverMethod = failover; + } + + _options.remove(OPTIONS_FAILOVER); + } + } + + public String getURL() + { + return _url; + } + + public String getFailoverMethod() + { + return _failoverMethod; + } + + public String getFailoverOption(String key) + { + return _failoverOptions.get(key); + } + + public void setFailoverOption(String key, String value) + { + _failoverOptions.put(key, value); + } + + public int getBrokerCount() + { + return _brokers.size(); + } + + public BrokerDetails getBrokerDetails(int index) + { + if (index < _brokers.size()) + { + return _brokers.get(index); + } + else + { + return null; + } + } + + public void addBrokerDetails(BrokerDetails broker) + { + if (!(_brokers.contains(broker))) + { + _brokers.add(broker); + } + } + + public List<BrokerDetails> getAllBrokerDetails() + { + return _brokers; + } + + public String getClientName() + { + return _clientName; + } + + public void setClientName(String clientName) + { + _clientName = clientName; + } + + public String getUsername() + { + return _username; + } + + public void setUsername(String username) + { + _username = username; + } + + public String getPassword() + { + return _password; + } + + public void setPassword(String password) + { + _password = password; + } + + public String getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(String virtuaHost) + { + _virtualHost = virtuaHost; + } + + public String getOption(String key) + { + return _options.get(key); + } + + public void setOption(String key, String value) + { + _options.put(key, value); + } + + public String toString() + { + StringBuffer sb = new StringBuffer(); + + sb.append(AMQ_PROTOCOL); + sb.append("://"); + + if (_username != null) + { + sb.append(_username); + + if (_password != null) + { + sb.append(':'); + sb.append(_password); + } + + sb.append('@'); + } + + sb.append(_clientName); + + sb.append(_virtualHost); + + sb.append(optionsToString()); + + return sb.toString(); + } + + private String optionsToString() + { + StringBuffer sb = new StringBuffer(); + + sb.append("?" + OPTIONS_BROKERLIST + "='"); + + for (BrokerDetails service : _brokers) + { + sb.append(service.toString()); + sb.append(';'); + } + + sb.deleteCharAt(sb.length() - 1); + sb.append("'"); + + if (_failoverMethod != null) + { + sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR); + sb.append(OPTIONS_FAILOVER + "='"); + sb.append(_failoverMethod); + sb.append(URLHelper.printOptions(_failoverOptions)); + sb.append("'"); + } + + return sb.toString(); + } + + + public static void main(String[] args) throws URLSyntaxException + { + + String url2 = "amqp://ritchiem:bob@temp?brokerlist='tcp://localhost:5672;jcp://fancyserver:3000/',failover='roundrobin'"; + //"amqp://user:pass@clientid/virtualhost?brokerlist='tcp://host:1?option1=\'value\',option2=\'value\';vm://:3?option1=\'value\'',failover='method?option1=\'value\',option2='value''"; + + //ConnectionURL connectionurl2 = new AMQConnectionURL(url2); + + System.out.println(url2); + //System.out.println(connectionurl2); + + } + +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java new file mode 100644 index 0000000000..fe20a1e8dd --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/BrokerDetails.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +public interface BrokerDetails +{ + + /* + * Known URL Options + * @see ConnectionURL + */ + public static final String OPTIONS_RETRY = "retries"; + public static final String OPTIONS_SSL = ConnectionURL.OPTIONS_SSL; + public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout"; + public static final int DEFAULT_PORT = 5672; + + public static final String TCP = "tcp"; + public static final String VM = "vm"; + + public static final String DEFAULT_TRANSPORT = TCP; + + public static final String URL_FORMAT_EXAMPLE = + "<transport>://<hostname>[:<port Default=\"" + DEFAULT_PORT + "\">][?<option>='<value>'[,<option>='<value>']]"; + + public static final long DEFAULT_CONNECT_TIMEOUT = 30000L; + public static final boolean USE_SSL_DEFAULT = false; + + String getHost(); + + void setHost(String host); + + int getPort(); + + void setPort(int port); + + String getTransport(); + + void setTransport(String transport); + + boolean useSSL(); + + void useSSL(boolean ssl); + + String getOption(String key); + + void setOption(String key, String value); + + long getTimeout(); + + void setTimeout(long timeout); + + String toString(); + + boolean equals(Object o); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java new file mode 100644 index 0000000000..79653dc442 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/ConnectionURL.java @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.util.List; + +/** + Connection URL format + For TCP: + amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\' + + For VMBroker: + vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''" + + Options are of course optional except for requiring a single broker in the broker list. + The option seperator is defined to be either '&' or ',' + */ +public interface ConnectionURL +{ + public static final String AMQ_PROTOCOL = "amqp"; + public static final String OPTIONS_BROKERLIST = "brokerlist"; + public static final String OPTIONS_FAILOVER = "failover"; + public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; + public static final String OPTIONS_SSL = "ssl"; + + String getURL(); + + String getFailoverMethod(); + + String getFailoverOption(String key); + + int getBrokerCount(); + + BrokerDetails getBrokerDetails(int index); + + void addBrokerDetails(BrokerDetails broker); + + List<BrokerDetails> getAllBrokerDetails(); + + String getClientName(); + + void setClientName(String clientName); + + String getUsername(); + + void setUsername(String username); + + String getPassword(); + + void setPassword(String password); + + String getVirtualHost(); + + void setVirtualHost(String virtualHost); + + String getOption(String key); + + void setOption(String key, String value); +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java new file mode 100644 index 0000000000..734aa68a9d --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java @@ -0,0 +1,83 @@ +package org.apache.qpid.nclient.transport; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; + +public class TCPConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(TCPConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + private PhaseContext _ctx; + + protected TCPConnection(ConnectionURL url, PhaseContext ctx) + { + _brokerDetails = url.getBrokerDetails(0); + _ctx = ctx; + + ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS)); + + // the MINA default is currently to use the pooled allocator although this may change in future + // once more testing of the performance of the simple allocator has been done + if (ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_POOLED_ALLOCATOR)) + { + // Not sure what the original code wanted use :) + } + else + { + _logger.info("Using SimpleByteBufferAllocator"); + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + _ioConnector = new SocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig(); + + // if we do not use our own thread model we get the MINA default which is to use + // its own leader-follower model + if (ClientConfiguration.get().getBoolean(QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); + } + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(ClientConfiguration.get().getBoolean(QpidConstants.TCP_NO_DELAY)); + scfg.setSendBufferSize(ClientConfiguration.get().getInt(QpidConstants.SEND_BUFFER_SIZE_IN_KB)*1024); + scfg.setReceiveBufferSize(ClientConfiguration.get().getInt(QpidConstants.RECEIVE_BUFFER_SIZE_IN_KB)*1024); + } + + // Returns the phase pipe + public Phase connect() throws AMQPException + { + _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(_ctx); + _phase.start(); + + return _phase; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java new file mode 100644 index 0000000000..9df353a2df --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnection.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + + +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.Phase; + +public interface TransportConnection +{ + public Phase connect()throws AMQPException; + + public void close()throws AMQPException; + + public Phase getPhasePipe(); +}
\ No newline at end of file diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java new file mode 100644 index 0000000000..ce2145c08b --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java @@ -0,0 +1,36 @@ +package org.apache.qpid.nclient.transport; + +import java.net.URISyntaxException; + +import org.apache.qpid.nclient.core.PhaseContext; + +public class TransportConnectionFactory +{ + public enum ConnectionType + { + TCP,VM + } + + public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException + { + return createTransportConnection(new AMQPConnectionURL(url),type,ctx); + + } + + public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx) + { + switch (type) + { + case TCP : default: + { + return new TCPConnection(url,ctx); + } + + case VM : + { + return new VMConnection(url,ctx); + } + } + + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java new file mode 100644 index 0000000000..911e855d4f --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.nclient.transport; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.log4j.Logger; +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandler; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersionList; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.AbstractPhase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.ssl.BogusSSLContextFactory; + +/** + * The Transport Phase corresponds to the Layer 1 in AMQP It works at the Frame + * layer + * + */ +public class TransportPhase extends AbstractPhase implements IoHandler, ProtocolVersionList +{ + + private static final Logger _logger = Logger + .getLogger(TransportPhase.class); + + private IoSession _ioSession; + private BrokerDetails _brokerDetails; + + protected WriteFuture _lastWriteFuture; + + /** + * ------------------------------------------------ + * Phase - methods introduced by Phase + * ------------------------------------------------ + */ + + public void start()throws AMQPException + { + _brokerDetails = (BrokerDetails)_ctx.getProperty(QpidConstants.AMQP_BROKER_DETAILS); + IoConnector ioConnector = (IoConnector)_ctx.getProperty(QpidConstants.MINA_IO_CONNECTOR); + + final SocketAddress address; + if (ioConnector instanceof VmPipeConnector) + { + address = new VmPipeAddress(_brokerDetails.getPort()); + } + else + { + address = new InetSocketAddress(_brokerDetails.getHost(), _brokerDetails.getPort()); + _logger.info("Attempting connection to " + address); + + } + + ConnectFuture future = ioConnector.connect(address,this); + + // wait for connection to complete + if (future.join(_brokerDetails.getTimeout())) + { + // we call getSession which throws an IOException if there has been an error connecting + future.getSession(); + } + else + { + throw new AMQPException("Timeout waiting for connection."); + } + } + + public void messageReceived(Object frame) throws AMQPException + { + super.messageReceived(frame); + } + + public void messageSent(Object frame) throws AMQPException + { + _ioSession.write(frame); + } + + /** + * ------------------------------------------------ + * IoHandler - methods introduced by IoHandler + * ------------------------------------------------ + */ + public void sessionIdle(IoSession session, IdleStatus status) + throws Exception + { + _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: " + + status); + if (IdleStatus.WRITER_IDLE.equals(status)) + { + // write heartbeat frame: + _logger.debug("Sent heartbeat"); + session.write(HeartbeatBody.FRAME); + // HeartbeatDiagnostics.sent(); + } else if (IdleStatus.READER_IDLE.equals(status)) + { + // failover: + // HeartbeatDiagnostics.timeout(); + _logger.warn("Timed out while waiting for heartbeat from peer."); + session.close(); + } + } + + public void messageReceived(IoSession session, Object message) + throws Exception + { + AMQFrame frame = (AMQFrame) message; + final AMQBody bodyFrame = frame.getBodyFrame(); + + if (bodyFrame instanceof HeartbeatBody) + { + _logger.debug("Received heartbeat"); + } else + { + messageReceived(frame); + } + } + + public void messageSent(IoSession session, Object message) throws Exception + { + _logger.debug("Sent frame " + message); + } + + public void exceptionCaught(IoSession session, Throwable cause) + throws Exception + { + // Need to handle failover + _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause); + //sessionClosed(session); + } + + public void sessionClosed(IoSession session) throws Exception + { + // Need to handle failover + _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed"); + } + + public void sessionCreated(IoSession session) throws Exception + { + _logger.info("Protocol session created for " + this + " session : " + + System.identityHashCode(session)); + + final ProtocolCodecFilter pcf = new ProtocolCodecFilter( + new AMQCodecFactory(false)); + + if (ClientConfiguration.get().getBoolean( + QpidConstants.USE_SHARED_READ_WRITE_POOL)) + { + session.getFilterChain().addBefore("AsynchronousWriteFilter", + "protocolFilter", pcf); + } + else + { + session.getFilterChain().addLast("protocolFilter", pcf); + } + // we only add the SSL filter where we have an SSL connection + if (_brokerDetails.useSSL()) + { + // FIXME: Bogus context cannot be used in production. + SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory + .getInstance(false)); + sslFilter.setUseClientMode(true); + session.getFilterChain().addBefore("protocolFilter", "ssl", + sslFilter); + } + + try + { + + ReadWriteThreadModel threadModel = ReadWriteThreadModel + .getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession( + session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession( + session); + } catch (RuntimeException e) + { + e.printStackTrace(); + } + + _ioSession = session; + doAMQPConnectionNegotiation(); + } + + public void sessionOpened(IoSession session) throws Exception + { + _logger.info("Protocol session opened for " + this + " : session " + + System.identityHashCode(session)); + } + + /** + * ---------------------------------------------------------- + * Protocol related methods + * ---------------------------------------------------------- + */ + private void doAMQPConnectionNegotiation() + { + int i = pv.length - 1; + _logger.debug("Engaging in connection negotiation"); + writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); + } + + /** + * ---------------------------------------------------------- + * Write Operations + * ---------------------------------------------------------- + */ + public void writeFrame(AMQDataBlock frame) + { + writeFrame(frame, false); + } + + public void writeFrame(AMQDataBlock frame, boolean wait) + { + WriteFuture f = _ioSession.write(frame); + if (wait) + { + // fixme -- time out? + f.join(); + } else + { + _lastWriteFuture = f; + } + } + + /** + * ----------------------------------------------------------- + * Failover section + * ----------------------------------------------------------- + */ +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java new file mode 100644 index 0000000000..ba38848149 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java @@ -0,0 +1,135 @@ +package org.apache.qpid.nclient.transport; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; +import org.apache.mina.transport.vmpipe.VmPipeAcceptor; +import org.apache.mina.transport.vmpipe.VmPipeAddress; +import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.qpid.nclient.config.ClientConfiguration; +import org.apache.qpid.nclient.core.AMQPException; +import org.apache.qpid.nclient.core.DefaultPhaseContext; +import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.PhaseContext; +import org.apache.qpid.nclient.core.PhaseFactory; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.pool.PoolingFilter; +import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; + +public class VMConnection implements TransportConnection +{ + private static final Logger _logger = Logger.getLogger(VMConnection.class); + private BrokerDetails _brokerDetails; + private IoConnector _ioConnector; + private Phase _phase; + private PhaseContext _ctx; + + protected VMConnection(ConnectionURL url,PhaseContext ctx) + { + _brokerDetails = url.getBrokerDetails(0); + _ctx = ctx; + + _ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); + ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); + PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executorService, + "AsynchronousReadFilter"); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executorService, + "AsynchronousWriteFilter"); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + } + + public Phase connect() throws AMQPException + { + createVMBroker(); + + _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + + _phase = PhaseFactory.createPhasePipe(_ctx); + _phase.start(); + + return _phase; + + } + + private void createVMBroker()throws AMQPException + { + _logger.info("Creating InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + + VmPipeAcceptor acceptor = new VmPipeAcceptor(); + IoServiceConfig config = acceptor.getDefaultConfig(); + config.setThreadModel(ReadWriteThreadModel.getInstance()); + + IoHandlerAdapter provider = null; + try + { + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + provider = createBrokerInstance(_brokerDetails.getPort()); + acceptor.bind(pipe, provider); + _logger.info("Created InVM Qpid.AMQP listening on port " + _brokerDetails.getPort()); + } + catch (IOException e) + { + _logger.error(e); + VmPipeAddress pipe = new VmPipeAddress(_brokerDetails.getPort()); + acceptor.unbind(pipe); + + throw new AMQPException("Error creating VM broker",e); + } + } + + private IoHandlerAdapter createBrokerInstance(int port) throws AMQPException + { + String protocolProviderClass = ClientConfiguration.get().getString(QpidConstants.QPID_VM_BROKER_CLASS); + _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); + + // can't use introspection to get Provider as it is a server class. + // need to go straight to IoHandlerAdapter but that requries the queues and exchange from the ApplicationRegistry which we can't access. + + //get correct constructor and pass in instancec ID - "port" + IoHandlerAdapter provider; + try + { + Class[] cnstr = {Integer.class}; + Object[] params = {port}; + provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + //Give the broker a second to create + _logger.info("Created VMBroker Instance:" + port); + } + catch (Exception e) + { + _logger.info("Unable to create InVM Qpid broker on port " + port + ". due to : " + e.getCause()); + _logger.error(e); + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } + + + throw new AMQPException(port, because + " Stopped InVM Qpid.AMQP creation",e); + } + + return provider; + } + + public void close() throws AMQPException + { + + } + + public Phase getPhasePipe() + { + return _phase; + } +} diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java new file mode 100644 index 0000000000..e161e30227 --- /dev/null +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/util/AMQPValidator.java @@ -0,0 +1,14 @@ +package org.apache.qpid.nclient.util; + +import org.apache.qpid.nclient.core.AMQPException; + +public class AMQPValidator +{ + public static void throwExceptionOnNull(Object obj, String msg) throws AMQPException + { + if(obj == null) + { + throw new AMQPException(msg); + } + } +} |
