diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-01-08 17:22:59 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-01-08 17:22:59 +0000 |
| commit | 7163772cf96e0f353629984fec667dc45d96bf0d (patch) | |
| tree | 44bd4b3dfc895bbadb191e21263fbc9a1089dda7 /RC7/qpid/java/client/example | |
| parent | 567a44900370d869717c2be65bc0f71402ee1f6b (diff) | |
| download | qpid-python-7163772cf96e0f353629984fec667dc45d96bf0d.tar.gz | |
Tag M4 RC7
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@732765 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC7/qpid/java/client/example')
97 files changed, 7034 insertions, 0 deletions
diff --git a/RC7/qpid/java/client/example/bin/README.txt b/RC7/qpid/java/client/example/bin/README.txt new file mode 100644 index 0000000000..9a1ce91d41 --- /dev/null +++ b/RC7/qpid/java/client/example/bin/README.txt @@ -0,0 +1,11 @@ += Qpid Java Examples = + +For more information read ../README.txt. + +== The Verify All Script == + +The verify_all script will run Java examples against itself and against the C++ +and Python examples. The success of the script is determined by comparing its +output against what is expected. + +This script uses the verify script found in qpid/cpp/examples. diff --git a/RC7/qpid/java/client/example/bin/set_classpath.bat b/RC7/qpid/java/client/example/bin/set_classpath.bat new file mode 100644 index 0000000000..d528967024 --- /dev/null +++ b/RC7/qpid/java/client/example/bin/set_classpath.bat @@ -0,0 +1,50 @@ +@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+
+@REM Helper script to set classpath for running Qpid example classes
+@REM NB: You must add the Qpid client and common jars to your CLASSPATH
+@REM before running this script
+
+@echo off
+
+if "%QPID_HOME%" == "" GOTO ERROR_QPID_HOME
+
+set QPIDLIB=%QPID_HOME%\lib
+
+if "%CLASSPATH%" == "" GOTO ERROR_CLASSPATH
+
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\backport-util-concurrent-2.2.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\geronimo-jms_1.1_spec-1.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-collections-3.1.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-configuration-1.2.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-cli-1.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-lang-2.1.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-api-1.0.4.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\commons-logging-1.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\log4j-1.2.12.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-core-1.0.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-filter-ssl-1.0.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\mina-java5-1.0.0.jar
+set CLASSPATH=%CLASSPATH%;%QPIDLIB%\slf4j-simple-1.0.jar
+
+GOTO END
+
+:ERROR_CLASSPATH
+Echo Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ....
+:ERROR_QPID_HOME
+Echo Please set QPID_HOME variable. Exiting ....
+:END
diff --git a/RC7/qpid/java/client/example/bin/set_classpath.sh b/RC7/qpid/java/client/example/bin/set_classpath.sh new file mode 100755 index 0000000000..89e9bc8242 --- /dev/null +++ b/RC7/qpid/java/client/example/bin/set_classpath.sh @@ -0,0 +1,83 @@ +#!/bin/sh -xv +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Helper script to set classpath for running Qpid example classes +# NB: You must add the Qpid client and common jars to your CLASSPATH +# before running this script + + +cygwin=false +if [[ "$(uname -a | fgrep Cygwin)" != "" ]]; then + cygwin=true +fi + +#Should have set the QPID_HOME var after install to the working dir e.g. home/qpid/qpid-1.0-incubating-M2-SNAPSHOT +if [ "$QPID_HOME" = "" ] ; then + echo "ERROR: Please set QPID_HOME variable. Exiting ...." + exit 1 +else + QPIDLIB=$QPID_HOME/lib +fi + +if $cygwin; then + QPIDLIB=$(cygpath -w $QPIDLIB) +fi + +if [ "$CLASSPATH" = "" ] ; then + echo "ERROR: Please set set your CLASSPATH variable to include the Qpid client and common jars. Exiting ...." + exit 2 +fi + +#Converts paths for cygwin if req +#Some nasty concatenation to get round cygpath line limits +if $cygwin; then + SEP=";" + CLASSPATH=`cygpath -w $CLASSPATH` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/backport-util-concurrent-2.2.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/geronimo-jms_1.1_spec-1.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-collections-3.1.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-configuration-1.2.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-cli-1.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-lang-2.1.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-api-1.0.4.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/commons-logging-1.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/log4j-1.2.12.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-core-1.0.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-filter-ssl-1.0.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/mina-java5-1.0.0.jar` + CLASSPATH=$CLASSPATH$SEP`cygpath -w $QPIDLIB/slf4j-simple-1.0.jar` + export CLASSPATH +else + CLASSPATH=$CLASSPATH:$QPIDLIB/backport-util-concurrent-2.2.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/geronimo-jms_1.1_spec-1.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-collections-3.1.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-configuration-1.2.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-cli-1.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-lang-2.1.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-api-1.0.4.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/commons-logging-1.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/log4j-1.2.12.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/mina-core-1.0.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/mina-filter-ssl-1.0.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/mina-java5-1.0.0.jar + CLASSPATH=$CLASSPATH:$QPIDLIB/slf4j-simple-1.0.jar + export CLASSPATH +fi + diff --git a/RC7/qpid/java/client/example/bin/verify_all b/RC7/qpid/java/client/example/bin/verify_all new file mode 100644 index 0000000000..0212e7d819 --- /dev/null +++ b/RC7/qpid/java/client/example/bin/verify_all @@ -0,0 +1,59 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +export QPID_SRC_HOME=$(cd "$(dirname $0)/../../../.."; pwd) +export CPP=$QPID_SRC_HOME/cpp/examples/examples +export PYTHON=$QPID_SRC_HOME/python/examples +export JAVA=$QPID_SRC_HOME/java/client/example/src/main/java/org/apache/qpid/example/jmsexample + +export AMQP_SPEC=$QPID_SRC_HOME/specs/amqp.0-10.xml +export PYTHONPATH=$QPID_SRC_HOME/python/ + +trap cleanup EXIT + +run_broker(){ + $QPID_SRC_HOME/cpp/src/qpidd -d --no-data-dir --auth no +} + +stop_broker(){ + $QPID_SRC_HOME/cpp/src/qpidd -q +} + +cleanup(){ + if [ -e /tmp/qpidd.5672.pid ]; then + stop_broker + fi + find $CPP -name '*.out' | xargs rm -f + find $PYTHON -name '*.out' | xargs rm -f + find $JAVA -name '*.out' | xargs rm -f +} + +QPID_LIBS=`find $QPID_SRC_HOME/java/build/lib -name '*.jar' | tr '\n' ":"` +export CLASSPATH=$QPID_LIBS:$CLASSPATH + +verify=$QPID_SRC_HOME/cpp/examples/verify + +for script in $(find $JAVA -name 'verify*' -not -path '*.svn' -not -name '*.*') +do + run_broker + $verify $script + stop_broker +done + diff --git a/RC7/qpid/java/client/example/build.xml b/RC7/qpid/java/client/example/build.xml new file mode 100644 index 0000000000..8bcd59d829 --- /dev/null +++ b/RC7/qpid/java/client/example/build.xml @@ -0,0 +1,27 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="AMQ Client" default="build"> + + <property name="module.depends" value="common client"/> + + <import file="../../module.xml"/> + +</project> diff --git a/RC7/qpid/java/client/example/source-jar.xml b/RC7/qpid/java/client/example/source-jar.xml new file mode 100644 index 0000000000..60451448b8 --- /dev/null +++ b/RC7/qpid/java/client/example/source-jar.xml @@ -0,0 +1,35 @@ +<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!-- This is an assembly descriptor that produces a jar file that contains all the
+ dependencies, fully expanded into a single jar, required to run the tests of
+ a maven project.
+ -->
+<assembly>
+ <id>source</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>src/main/java</directory>
+ <outputDirectory></outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/RC7/qpid/java/client/example/src/main/java/README.txt b/RC7/qpid/java/client/example/src/main/java/README.txt new file mode 100644 index 0000000000..c8fe6a76ad --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/README.txt @@ -0,0 +1,17 @@ +In order to use the runSample script, you are required to set two environment +variables, QPID_HOME and QPID_SAMPLE. If not the default values will be used. + +QPID_HOME +--------- +This is the distribution directory. You would have set the QPID_HOME when you +started the Qpid Java broker. + +default: /usr/share/java/ + +QPID_SAMPLE +----------- +This is the parent directory of the directory in which you find the runSample.sh +(Ex:- $QPID_SRC_HOME/java/client/example/src/main) + +default: /usr/share/doc/rhm-0.2 + diff --git a/RC7/qpid/java/client/example/src/main/java/log4j.xml b/RC7/qpid/java/client/example/src/main/java/log4j.xml new file mode 100644 index 0000000000..0b38b14c02 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/log4j.xml @@ -0,0 +1,49 @@ +<?xml version="1.0"?> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="FileAppender" class="org.apache.log4j.FileAppender"> + <param name="File" value="qpid_messaging.log"/> + <param name="Append" value="false"/> + + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%t %-5p %c{2} - %m%n"/> + </layout> + </appender> + + <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> + </layout> + </appender> + + <logger name="org.apache"> + <!-- Print only messages of level warn or above in the package org.apache --> + <level value="warn"/> + </logger> + + <root> + <priority value="info"/> + <appender-ref ref="STDOUT"/> + <!-- <appender-ref ref="FileAppender"/> --> + </root> +</log4j:configuration>
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java new file mode 100755 index 0000000000..38073cb7f2 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java @@ -0,0 +1,55 @@ +package org.apache.qpid.example.amqpexample.direct; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; + +/** + * This creates a queue a queue and binds it to the + * amq.direct exchange + * + */ +public class DeclareQueue +{ + + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // declare and bind queue + session.queueDeclare("message_queue", null, null); + session.exchangeBind("message_queue", "amq.direct", "routing_key", null); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java new file mode 100755 index 0000000000..2234eb22da --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -0,0 +1,67 @@ +package org.apache.qpid.example.amqpexample.direct; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; + +import org.apache.qpid.api.Message; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; + +public class DirectProducer +{ + + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey("routing_key"); + + for (int i=0; i<10; i++) + { + session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProps), + "Message " + i); + } + + session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProps), + "That's all, folks!"); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java new file mode 100755 index 0000000000..93bb097268 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -0,0 +1,103 @@ +package org.apache.qpid.example.amqpexample.direct; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; + +/** + * This listens to messages on a queue and terminates + * when it sees the final message + * + */ +public class Listener implements SessionListener +{ + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("Message: " + xfr); + } + + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + + /** + * This sends 10 messages to the + * amq.direct exchange using the + * routing key as "routing_key" + * + */ + public static void main(String[] args) throws InterruptedException + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + Listener listener = new Listener(); + session.setSessionListener(listener); + + // create a subscription + session.messageSubscribe("message_queue", + "listener_destination", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + + // issue credits + // XXX + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); + + // confirm completion + session.sync(); + + // wait to receive all the messages + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); + System.out.println("Shutting down listener for listener_destination"); + session.messageCancel("listener_destination"); + + //cleanup + session.close(); + con.close(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java new file mode 100755 index 0000000000..9c3ec2fb3b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/DeclareQueue.java @@ -0,0 +1,55 @@ +package org.apache.qpid.example.amqpexample.fanout; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.Session; + +/** + * This creates a queue a queue and binds it to the + * amq.direct exchange + * + */ +public class DeclareQueue +{ + + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // declare and bind queue + session.queueDeclare("message_queue", null, null); + session.exchangeBind("message_queue", "amq.fanout",null, null); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java new file mode 100755 index 0000000000..39d34713c6 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java @@ -0,0 +1,66 @@ +package org.apache.qpid.example.amqpexample.fanout; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; + +public class FannoutProducer +{ + /** + * This sends 10 messages to the + * amq.fannout exchange + */ + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey("routing_key"); + + for (int i=0; i<10; i++) + { + session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProps), "Message " + i); + } + + session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProps), + "That's all, folks!"); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java new file mode 100755 index 0000000000..4c72ce75a5 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -0,0 +1,103 @@ +package org.apache.qpid.example.amqpexample.fanout; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; + +/** + * This listens to messages on a queue and terminates + * when it sees the final message + * + */ +public class Listener implements SessionListener +{ + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + System.out.println("Message: " + xfr); + } + + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + + /** + * This sends 10 messages to the + * amq.direct exchange using the + * routing key as "routing_key" + * + */ + public static void main(String[] args) throws InterruptedException + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + Listener listener = new Listener(); + session.setSessionListener(listener); + + // create a subscription + session.messageSubscribe("message_queue", + "listener_destination", + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + + + // issue credits + // XXX + session.messageFlow("listener_destination", MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + session.messageFlow("listener_destination", MessageCreditUnit.MESSAGE, 11); + + // confirm completion + session.sync(); + + // check to see if we have received all the messages + System.out.println("Waiting 100 seconds for messages from listener_destination"); + Thread.sleep(100*1000); + System.out.println("Shutting down listener for listener_destination"); + session.messageCancel("listener_destination"); + + //cleanup + session.close(); + con.close(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java new file mode 100755 index 0000000000..5e6d3c6f69 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -0,0 +1,112 @@ +package org.apache.qpid.example.amqpexample.pubsub; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; + + +public class TopicListener implements SessionListener +{ + + public void opened(Session ssn) {} + + public void message(Session ssn, MessageTransfer xfr) + { + DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class); + System.out.println("Message: " + xfr + " with routing_key " + dp.getRoutingKey()); + } + + public void exception(Session ssn, SessionException exc) + { + exc.printStackTrace(); + } + + public void closed(Session ssn) {} + + public void prepareQueue(Session session,String queueName,String bindingKey) + { + session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); + session.exchangeBind(queueName, "amq.topic", bindingKey, null); + session.exchangeBind(queueName, "amq.topic", "control", null); + + session.messageSubscribe(queueName, queueName, + MessageAcceptMode.NONE, + MessageAcquireMode.PRE_ACQUIRED, + null, 0, null); + // issue credits + // XXX: need to be able to set to null + session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); + session.messageFlow(queueName, MessageCreditUnit.MESSAGE, 24); + } + + public void cancelSubscription(Session session,String dest) + { + session.messageCancel(dest); + } + + public static void main(String[] args) throws InterruptedException + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + TopicListener listener = new TopicListener(); + session.setSessionListener(listener); + + listener.prepareQueue(session,"usa", "usa.#"); + listener.prepareQueue(session,"europe", "europe.#"); + listener.prepareQueue(session,"news", "#.news"); + listener.prepareQueue(session,"weather", "#.weather"); + + // confirm completion + session.sync(); + + System.out.println("Waiting 100 seconds for messages"); + Thread.sleep(100*1000); + + System.out.println("Shutting down listeners"); + listener.cancelSubscription(session,"usa"); + listener.cancelSubscription(session,"europe"); + listener.cancelSubscription(session,"news"); + listener.cancelSubscription(session,"weather"); + + //cleanup + session.close(); + con.close(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java new file mode 100755 index 0000000000..facf08eeca --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java @@ -0,0 +1,80 @@ +package org.apache.qpid.example.amqpexample.pubsub; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.Session; + +public class TopicPublisher +{ + + public void publishMessages(Session session, String routing_key) + { + // Set the routing key once, we'll use the same routing key for all + // messages. + + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey(routing_key); + + for (int i=0; i<5; i++) { + session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, + new Header(deliveryProps), "Message " + i); + } + + } + + public void noMoreMessages(Session session) + { + session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, + new Header(new DeliveryProperties().setRoutingKey("control")), + "That's all, folks!"); + } + + public static void main(String[] args) + { + // Create connection + Connection con = new Connection(); + con.connect("localhost", 5672, "test", "guest", "guest",false); + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + TopicPublisher publisher = new TopicPublisher(); + + publisher.publishMessages(session, "usa.news"); + publisher.publishMessages(session, "usa.weather"); + publisher.publishMessages(session, "europe.news"); + publisher.publishMessages(session, "europe.weather"); + + // confirm completion + session.sync(); + + //cleanup + session.close(); + con.close(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java new file mode 100644 index 0000000000..f84b16f485 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.direct; + +import java.util.Properties; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * The example creates a MessageConsumer on the specified + * Queue which is used to synchronously consume messages. + */ +public class Consumer +{ + /** + * Used in log output. + */ + private static final String CLASS = "Consumer"; + + + /** + * Run the message consumer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Consumer syncConsumer = new Consumer(); + syncConsumer.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Load JNDI properties + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("direct.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + // look up destination + Destination destination = (Destination)ctx.lookup("directQueue"); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection = conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Cycle round until all the messages are consumed. + Message message; + boolean end = false; + while (!end) + { + message = messageConsumer.receive(); + String text; + if (message instanceof TextMessage) + { + text = ((TextMessage) message).getText(); + } + else + { + byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text = new String(body); + } + if (text.equals("That's all, folks!")) + { + System.out.println(CLASS + ": Received final message " + text); + end = true; + } + else + { + System.out.println(CLASS + ": Received message: " + text); + } + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java new file mode 100644 index 0000000000..d2e1180c9b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.direct; + +import java.util.Properties; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * The example creates a MessageConsumer on the specified + * Queue and uses a MessageListener with this MessageConsumer + * in order to enable asynchronous delivery. + */ +public class Listener implements MessageListener +{ + /* Used in log output. */ + private static final String CLASS = "Listener"; + + /** + * An object to synchronize on. + */ + private final static Object _lock = new Object(); + + /** + * A boolean to indicate a clean finish. + */ + private static boolean _finished = false; + + /** + * A boolean to indicate an unsuccesful finish. + */ + private static boolean _failed = false; + + + /** + * Run the message consumer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Listener listener = new Listener(); + listener.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Load JNDI properties + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("direct.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + // look up destination + Destination destination = (Destination)ctx.lookup("directQueue"); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection = conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Set a message listener on the messageConsumer + messageConsumer.setMessageListener(this); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Wait for the messageConsumer to have received all the messages it needs + synchronized (_lock) + { + while (!_finished && !_failed) + { + _lock.wait(); + } + } + + // If the MessageListener abruptly failed (probably due to receiving a non-text message) + if (_failed) + { + System.out.println(CLASS + ": This sample failed as it received unexpected messages"); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + /** + * This method is required by the <CODE>MessageListener</CODE> interface. It + * will be invoked when messages are available. + * After receiving the finish message (That's all, folks!) it releases a lock so that the + * main program may continue. + * + * @param message The message. + */ + public void onMessage(Message message) + { + try + { + String text; + if (message instanceof TextMessage) + { + text = ((TextMessage) message).getText(); + } + else + { + byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text = new String(body); + } + if (text.equals("That's all, folks!")) + { + System.out.println(CLASS + ": Received final message " + text); + synchronized (_lock) + { + _finished = true; + _lock.notifyAll(); + } + } + else + { + System.out.println(CLASS + ": Received message: " + text); + } + } + catch (JMSException exp) + { + System.out.println(CLASS + ": Caught an exception handling a received message"); + exp.printStackTrace(); + synchronized (_lock) + { + _failed = true; + _lock.notifyAll(); + } + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java new file mode 100644 index 0000000000..259756764c --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.direct; + +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * Message producer example, sends message to a queue. + */ +public class Producer +{ + /* Used in log output. */ + private static final String CLASS = "Producer"; + + private int numMessages = 10; + private short deliveryMode = DeliveryMode.NON_PERSISTENT; + + /** + * Create a Producer client. + */ + public Producer () + { + } + + /** + * Run the message producer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Producer producer = new Producer(); + producer.runTest(); + } + + private void runTest() + { + try + { + + // Load JNDI properties + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("direct.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + // look up destination + Destination destination = (Destination)ctx.lookup("directQueue"); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection = conFac.createConnection(); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // lookup the queue + //Queue destination = session.createQueue(_queueName); + + // Create a Message producer + System.out.println(CLASS + ": Creating a Message Producer"); + MessageProducer messageProducer = session.createProducer(destination); + + // Create a Message + TextMessage message; + System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); + + // Loop to publish the requested number of messages. + for (int i = 1; i < numMessages + 1; i++) + { + // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages, + // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING. + message = session.createTextMessage("Message " + i); + System.out.println(CLASS + ": Sending message: " + i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + + // And send a final message to indicate termination. + message = session.createTextMessage("That's all, folks!"); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the broker + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + exp.printStackTrace(); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties new file mode 100644 index 0000000000..a2f5843e7a --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# Register an AMQP destination in JNDI +# destination.[jniName] = [BindingURL] +destination.directQueue = direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify new file mode 100644 index 0000000000..66b4b3c54e --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +cpp=$CPP/direct + +direct_consumer_java() +{ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Consumer +} + +direct_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Producer +} + +clients $cpp/declare_queues direct_producer_java direct_consumer_java +outputs $cpp/declare_queues.out ./direct_producer_java.out ./direct_consumer_java.out diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify.in new file mode 100644 index 0000000000..c87e5716c8 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify.in @@ -0,0 +1,35 @@ +==== declare_queues.out +==== direct_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== direct_consumer_java.out +Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Consumer: Creating a non-transacted, auto-acknowledged session +Consumer: Creating a MessageConsumer +Consumer: Starting connection so MessageConsumer can receive messages +Consumer: Received message: Message 1 +Consumer: Received message: Message 2 +Consumer: Received message: Message 3 +Consumer: Received message: Message 4 +Consumer: Received message: Message 5 +Consumer: Received message: Message 6 +Consumer: Received message: Message 7 +Consumer: Received message: Message 8 +Consumer: Received message: Message 9 +Consumer: Received message: Message 10 +Consumer: Received final message That's all, folks! +Consumer: Closing connection +Consumer: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java new file mode 100644 index 0000000000..d581c4c1aa --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/direct + +direct_consumer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Consumer +} + +clients $cpp/declare_queues $cpp/direct_producer direct_consumer_java +outputs $cpp/declare_queues.out $cpp/direct_producer.out ./direct_consumer_java.out + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java.in new file mode 100644 index 0000000000..b50692da1f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_cpp_java.in @@ -0,0 +1,20 @@ +==== declare_queues.out +==== direct_producer.out +==== direct_consumer_java.out +Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Consumer: Creating a non-transacted, auto-acknowledged session +Consumer: Creating a MessageConsumer +Consumer: Starting connection so MessageConsumer can receive messages +Consumer: Received message: Message 0 +Consumer: Received message: Message 1 +Consumer: Received message: Message 2 +Consumer: Received message: Message 3 +Consumer: Received message: Message 4 +Consumer: Received message: Message 5 +Consumer: Received message: Message 6 +Consumer: Received message: Message 7 +Consumer: Received message: Message 8 +Consumer: Received message: Message 9 +Consumer: Received final message That's all, folks! +Consumer: Closing connection +Consumer: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp new file mode 100644 index 0000000000..573cac6986 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/direct + +direct_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Producer +} + +clients $cpp/declare_queues direct_producer_java $cpp/listener +outputs $cpp/declare_queues.out ./direct_producer_java.out $cpp/listener.out + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp.in new file mode 100644 index 0000000000..946c19953f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_cpp.in @@ -0,0 +1,30 @@ +==== declare_queues.out +==== direct_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== listener.out +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: Message 10 +Message: That's all, folks! +Shutting down listener for message_queue diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python new file mode 100644 index 0000000000..61c033e969 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python @@ -0,0 +1,9 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/direct + +direct_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Producer +} + +clients $py/declare_queues.py direct_producer_java $py/direct_consumer.py +outputs $py/declare_queues.py.out ./direct_producer_java.out $py/direct_consumer.py.out diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python.in new file mode 100644 index 0000000000..e012405352 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_java_python.in @@ -0,0 +1,29 @@ +==== declare_queues.py.out +==== direct_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== direct_consumer.py.out +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +Message 10 +That's all, folks! diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java new file mode 100644 index 0000000000..4182331f3f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/direct + +direct_consumer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.direct.Consumer +} + +clients $py/declare_queues.py $py/direct_producer.py direct_consumer_java +outputs $py/declare_queues.py.out $py/direct_producer.py.out ./direct_consumer_java.out + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java.in new file mode 100644 index 0000000000..6a9c9fdd10 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/verify_python_java.in @@ -0,0 +1,20 @@ +==== declare_queues.py.out +==== direct_producer.py.out +==== direct_consumer_java.out +Consumer: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Consumer: Creating a non-transacted, auto-acknowledged session +Consumer: Creating a MessageConsumer +Consumer: Starting connection so MessageConsumer can receive messages +Consumer: Received message: message 0 +Consumer: Received message: message 1 +Consumer: Received message: message 2 +Consumer: Received message: message 3 +Consumer: Received message: message 4 +Consumer: Received message: message 5 +Consumer: Received message: message 6 +Consumer: Received message: message 7 +Consumer: Received message: message 8 +Consumer: Received message: message 9 +Consumer: Received final message That's all, folks! +Consumer: Closing connection +Consumer: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java new file mode 100755 index 0000000000..daa1b10b6b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.fanout; + +import java.util.Properties; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * The example creates a MessageConsumer on the specified + * Queue which is used to synchronously consume messages. + */ +public class Consumer +{ + /** + * Used in log output. + */ + private static final String CLASS = "Consumer"; + + /** + * Create a Consumer client. + * + */ + public Consumer() + { + } + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) throws Exception + { + if (args.length == 0) + { + throw new Exception("You need to specify the JNDI name for the queue"); + } + Consumer syncConsumer = new Consumer(); + syncConsumer.runTest(args[0]); + } + + /** + * Start the example. + */ + private void runTest(String queueName) + { + try + { + // Load JNDI properties + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + // look up destination + Destination destination = (Destination)ctx.lookup(queueName); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection = conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Cycle round until all the messages are consumed. + Message message; + boolean end = false; + while (!end) + { + message = messageConsumer.receive(); + String text; + if (message instanceof TextMessage) + { + text = ((TextMessage) message).getText(); + } + else + { + byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text = new String(body); + } + if (text.equals("That's all, folks!")) + { + System.out.println(CLASS + ": Received final message " + text); + end = true; + } + else + { + System.out.println(CLASS + ": Received message: " + text); + } + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java new file mode 100755 index 0000000000..fb750693b2 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java @@ -0,0 +1,201 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.fanout; + +import java.util.Properties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * The example creates a MessageConsumer on the specified + * Queue and uses a MessageListener with this MessageConsumer + * in order to enable asynchronous delivery. + */ +public class Listener implements MessageListener +{ + /* Used in log output. */ + private static final String CLASS = "Listener"; + + /** + * An object to synchronize on. + */ + private final Object _lock = new Object(); + + /** + * A boolean to indicate a clean finish. + */ + private boolean _finished = false; + + /** + * A boolean to indicate an unsuccesful finish. + */ + private boolean _failed = false; + + + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) throws Exception + { + if (args.length == 0) + { + throw new Exception("You need to specify the JNDI name for the queue"); + } + Listener listener = new Listener(); + listener.runTest(args[0]); + } + + /** + * Start the example. + */ + private void runTest(String queueName) + { + try + { + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + Destination destination = (Destination)ctx.lookup(queueName); + + // Declare the connection + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + Connection connection = conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Set a message listener on the messageConsumer + messageConsumer.setMessageListener(this); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Wait for the messageConsumer to have received all the messages it needs + synchronized (_lock) + { + while (!_finished && !_failed) + { + _lock.wait(); + } + } + + // If the MessageListener abruptly failed (probably due to receiving a non-text message) + if (_failed) + { + System.out.println(CLASS + ": This sample failed as it received unexpected messages"); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + /** + * This method is required by the <CODE>MessageListener</CODE> interface. It + * will be invoked when messages are available. + * After receiving the finish message (That's all, folks!) it releases a lock so that the + * main program may continue. + * + * @param message The message. + */ + public void onMessage(Message message) + { + try + { + String text; + if (message instanceof TextMessage) + { + text = ((TextMessage) message).getText(); + } + else + { + byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text = new String(body); + } + if (text.equals("That's all, folks!")) + { + System.out.println(CLASS + ": Received final message " + text); + synchronized (_lock) + { + _finished = true; + _lock.notifyAll(); + } + } + else + { + System.out.println(CLASS + ": Received message: " + text); + } + } + catch (JMSException exp) + { + System.out.println(CLASS + ": Caught an exception handling a received message"); + exp.printStackTrace(); + synchronized (_lock) + { + _failed = true; + _lock.notifyAll(); + } + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java new file mode 100755 index 0000000000..2e360f37bb --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.fanout; + +import java.util.Properties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * Message producer example, sends message to a queue. + */ +public class Producer +{ + /* Used in log output. */ + private static final String CLASS = "Producer"; + + /* The queue name */ + private int numMessages = 10; + private short deliveryMode = DeliveryMode.NON_PERSISTENT; + + + /** + * Run the message producer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Producer producer = new Producer(); + producer.runTest(); + } + + private void runTest() + { + try + { + + Properties properties = new Properties(); + properties.load(this.getClass().getResourceAsStream("fanout.properties")); + + //Create the initial context + Context ctx = new InitialContext(properties); + + Destination destination = (Destination)ctx.lookup("fanoutQueue"); + + // Declare the connection + ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); + Connection connection = conFac.createConnection(); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // lookup the queue + //Queue destination = session.createQueue(_queueName); + + // Create a Message producer + System.out.println(CLASS + ": Creating a Message Producer"); + MessageProducer messageProducer = session.createProducer(destination); + + // Create a Message + TextMessage message; + System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); + + // Loop to publish the requested number of messages. + for (int i = 1; i < numMessages + 1; i++) + { + // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages, + // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING. + message = session.createTextMessage("Message " + i); + System.out.println(CLASS + ": Sending message: " + i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + + // And send a final message to indicate termination. + message = session.createTextMessage("That's all, folks!"); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the broker + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + exp.printStackTrace(); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties new file mode 100644 index 0000000000..901994541d --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# Register an AMQP destination in JNDI +# destination.[jniName] = [BindingURL] +destination.fanoutQueue1 = fanout://amq.fanout//message_queue1 +destination.fanoutQueue2 = fanout://amq.fanout//message_queue2 +destination.fanoutQueue3 = fanout://amq.fanout//message_queue3 + +# for producer +destination.fanoutQueue = fanout://amq.fanout//message_queue
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify new file mode 100644 index 0000000000..6617e37e85 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +cpp=$CPP/fanout + +fanout_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Listener $1 +} + +fanout_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Producer +} + +background "can receive messages" fanout_listener_java fanoutQueue1 +background "can receive messages" fanout_listener_java fanoutQueue2 +background "can receive messages" fanout_listener_java fanoutQueue3 +clients fanout_producer_java +outputs ./fanout_producer_java.out "./fanout_listener_java.out | remove_uuid" "./fanout_listener_javaX.out | remove_uuid" "./fanout_listener_javaXX.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify.in new file mode 100644 index 0000000000..c36a515c2a --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify.in @@ -0,0 +1,70 @@ +==== fanout_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== fanout_listener_java.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received message: Message 10 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received message: Message 10 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaXX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received message: Message 10 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java new file mode 100644 index 0000000000..de057ea3b1 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java @@ -0,0 +1,13 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +cpp=$CPP/fanout + +fanout_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Listener $1 +} + +background "can receive messages" fanout_listener_java fanoutQueue1 +background "can receive messages" fanout_listener_java fanoutQueue2 +background "can receive messages" fanout_listener_java fanoutQueue3 +clients $cpp/fanout_producer +outputs $cpp/fanout_producer.out "./fanout_listener_java.out | remove_uuid" "./fanout_listener_javaX.out | remove_uuid" "./fanout_listener_javaXX.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java.in new file mode 100644 index 0000000000..905fe3d0bc --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_cpp_java.in @@ -0,0 +1,55 @@ +==== fanout_producer.out +==== fanout_listener_java.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 0 +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 0 +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaXX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: Message 0 +Listener: Received message: Message 1 +Listener: Received message: Message 2 +Listener: Received message: Message 3 +Listener: Received message: Message 4 +Listener: Received message: Message 5 +Listener: Received message: Message 6 +Listener: Received message: Message 7 +Listener: Received message: Message 8 +Listener: Received message: Message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp new file mode 100644 index 0000000000..dab6114572 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp @@ -0,0 +1,13 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +cpp=$CPP/fanout + +fanout_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Producer +} + +background "Listening" $cpp/listener +background "Listening" $cpp/listener +background "Listening" $cpp/listener +clients fanout_producer_java +outputs ./fanout_producer_java.out "$cpp/listener.out | remove_uuid" "$cpp/listenerX.out | remove_uuid" "$cpp/listenerXX.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp.in new file mode 100644 index 0000000000..03e75e39c6 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_cpp.in @@ -0,0 +1,58 @@ +==== fanout_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== listener.out | remove_uuid +Listening +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: Message 10 +Message: That's all, folks! +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: Message 10 +Message: That's all, folks! +Shutting down listener for +==== listenerXX.out | remove_uuid +Listening +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: Message 10 +Message: That's all, folks! +Shutting down listener for diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python new file mode 100644 index 0000000000..1641d88354 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python @@ -0,0 +1,13 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +py=$PYTHON_EXAMPLES/fanout + +fanout_producer_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Producer +} + +background "Subscribed" $py/fanout_consumer.py +background "Subscribed" $py/fanout_consumer.py +background "Subscribed" $py/fanout_consumer.py +clients fanout_producer_java +outputs ./fanout_producer_java.out "$py/fanout_consumer.py.out | remove_uuid" "$py/fanout_consumer.pyX.out | remove_uuid" "$py/fanout_consumer.pyXX.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python.in new file mode 100644 index 0000000000..0089e55c16 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_java_python.in @@ -0,0 +1,55 @@ +==== fanout_producer_java.out +Producer: Creating a non-transacted, auto-acknowledged session +Producer: Creating a Message Producer +Producer: Creating a TestMessage to send to the destination +Producer: Sending message: 1 +Producer: Sending message: 2 +Producer: Sending message: 3 +Producer: Sending message: 4 +Producer: Sending message: 5 +Producer: Sending message: 6 +Producer: Sending message: 7 +Producer: Sending message: 8 +Producer: Sending message: 9 +Producer: Sending message: 10 +Producer: Closing connection +Producer: Closing JNDI context +==== fanout_consumer.py.out | remove_uuid +Subscribed to queue +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +Message 10 +That's all, folks! +==== fanout_consumer.pyX.out | remove_uuid +Subscribed to queue +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +Message 10 +That's all, folks! +==== fanout_consumer.pyXX.out | remove_uuid +Subscribed to queue +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +Message 10 +That's all, folks! diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java new file mode 100644 index 0000000000..0f05663985 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java @@ -0,0 +1,13 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +# The JMS producer doesn't create qeueues so utilising the c++ declare_queues +py=$PYTHON_EXAMPLES/fanout + +fanout_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.fanout.Listener $1 +} + +background "can receive messages" fanout_listener_java fanoutQueue1 +background "can receive messages" fanout_listener_java fanoutQueue2 +background "can receive messages" fanout_listener_java fanoutQueue3 +clients $py/fanout_producer.py +outputs $py/fanout_producer.py.out "./fanout_listener_java.out | remove_uuid" "./fanout_listener_javaX.out | remove_uuid" "./fanout_listener_javaXX.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java.in new file mode 100644 index 0000000000..1d8e1c2790 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/verify_python_java.in @@ -0,0 +1,55 @@ +==== fanout_producer.py.out +==== fanout_listener_java.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: message 0 +Listener: Received message: message 1 +Listener: Received message: message 2 +Listener: Received message: message 3 +Listener: Received message: message 4 +Listener: Received message: message 5 +Listener: Received message: message 6 +Listener: Received message: message 7 +Listener: Received message: message 8 +Listener: Received message: message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: message 0 +Listener: Received message: message 1 +Listener: Received message: message 2 +Listener: Received message: message 3 +Listener: Received message: message 4 +Listener: Received message: message 5 +Listener: Received message: message 6 +Listener: Received message: message 7 +Listener: Received message: message 8 +Listener: Received message: message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context +==== fanout_listener_javaXX.out | remove_uuid +Listener: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Creating a MessageConsumer +Listener: Starting connection so MessageConsumer can receive messages +Listener: Received message: message 0 +Listener: Received message: message 1 +Listener: Received message: message 2 +Listener: Received message: message 3 +Listener: Received message: message 4 +Listener: Received message: message 5 +Listener: Received message: message 6 +Listener: Received message: message 7 +Listener: Received message: message 8 +Listener: Received message: message 9 +Listener: Received final message That's all, folks! +Listener: Closing connection +Listener: Closing JNDI context diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java new file mode 100644 index 0000000000..1a3d40041d --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java @@ -0,0 +1,214 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.jmsexample.pubsub; + +import java.util.Properties; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.Context; +import javax.naming.InitialContext; + +/** + * The example creates a TopicSubscriber on the specified + * Topic and uses a MessageListener with this TopicSubscriber + * in order to enable asynchronous delivery. + */ +public class Listener +{ + /* Used in log output. */ + private static final String CLASS="Listener"; + + /* An object to synchronize on. */ + private final static Object _lock=new Object(); + + /* A boolean to indicate a clean finish. */ + private static int _finished=0; + + /* A boolean to indicate an unsuccesful finish. */ + private static boolean _failed=false; + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Listener listener=new Listener(); + listener.runTest(); + } + + private void createListener(Context ctx,TopicSession session,String topicName) throws Exception{ + // lookup the topic usa + Topic topic=(Topic) ctx.lookup(topicName); + // Create a Message Subscriber + System.out.println(CLASS + ": Creating a Message Subscriber for topic " + topicName); + javax.jms.TopicSubscriber messageSubscriber=session.createSubscriber(topic); + + // Set a message listener on the messageConsumer + messageSubscriber.setMessageListener(new MyMessageListener(topicName)); + + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("pubsub.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Declare the connection + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + TopicConnection connection=(TopicConnection) conFac.createConnection(); + + // As this application is using a TopicSubscriber we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a TopicSubscriber"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + createListener(ctx,session,"usa"); + createListener(ctx,session,"europe"); + createListener(ctx,session,"news"); + createListener(ctx,session,"weather"); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so TopicSubscriber can receive messages"); + connection.start(); + + // Wait for the messageConsumer to have received all the messages it needs + synchronized (_lock) + { + while (_finished < 4 && !_failed) + { + _lock.wait(); + } + } + + // If the MessageListener abruptly failed (probably due to receiving a non-text message) + if (_failed) + { + System.out.println(CLASS + ": This sample failed as it received unexpected messages"); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + private class MyMessageListener implements MessageListener + { + /* The topic this subscriber is subscribing to */ + private String _topicName; + + public MyMessageListener(String topicName) + { + _topicName=topicName; + } + + /** + * This method is required by the <CODE>MessageListener</CODE> interface. It + * will be invoked when messages are available. + * After receiving the final message it releases a lock so that the + * main program may continue. + * + * @param message The message. + */ + public void onMessage(Message message) + { + try + { + String text; + if (message instanceof TextMessage) + { + text=((TextMessage) message).getText(); + } + else + { + byte[] body=new byte[(int) ((BytesMessage) message).getBodyLength()]; + ((BytesMessage) message).readBytes(body); + text=new String(body); + } + if (text.equals("That's all, folks!")) + { + System.out.println(CLASS + ": Shutting down listener for " + _topicName); + synchronized (_lock) + { + _finished++; + _lock.notifyAll(); + } + } + else + { + System.out.println(CLASS + ": Received message for topic: " + _topicName + ": " + text); + } + } + catch (JMSException exp) + { + System.out.println(CLASS + ": Caught an exception handling a received message"); + exp.printStackTrace(); + synchronized (_lock) + { + _failed=true; + _lock.notifyAll(); + } + } + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java new file mode 100644 index 0000000000..360b2c6aed --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.pubsub; + + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; + +/** + * Publish messages to topics + */ +public class Publisher +{ + /* Used in log output. */ + private static final String CLASS="Publisher"; + + /** + * Run the message producer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Publisher publisher = new Publisher(); + publisher.runTest(); + } + + private void runTest() + { + try + { + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("pubsub.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Declare the connection + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + TopicConnection connection= (TopicConnection) conFac.createConnection(); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + TopicSession session=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a Message + TextMessage message; + System.out.println(CLASS + ": Creating a TestMessage to send to the topics"); + message=session.createTextMessage(); + + // lookup the topics usa.weather + Topic topic = (Topic)ctx.lookup("usa.weather"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisher for topic usa.weather"); + TopicPublisher messagePublisher=session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics usa.news + topic = (Topic)ctx.lookup("usa.news"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisher for topic usa.news"); + messagePublisher=session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics europe.weather + topic = (Topic)ctx.lookup("europe.weather"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisher for topic europe.weather"); + messagePublisher=session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics europe.news + topic = (Topic)ctx.lookup("europe.news"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisher for topic europe.news"); + messagePublisher = session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // send the final message + message=session.createTextMessage("That's all, folks!"); + topic = (Topic)ctx.lookup("control"); + // Create a Message Publisher + messagePublisher = session.createPublisher(topic); + messagePublisher + .send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + + // Close the connection to the broker + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + private void publishMessages(TextMessage message, TopicPublisher messagePublisher) throws JMSException + { + // Loop to publish 5 messages. + for (int i=1; i <= 6; i++) + { + message.setText("message " + i); + System.out.println(CLASS + ": Sending " + message.getText()); + messagePublisher + .send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties new file mode 100644 index 0000000000..91c3de721b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/pubsub.properties @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.usa.weather = usa.weather,control +topic.usa.news = usa.news,control +topic.europe.weather = europe.weather,control +topic.europe.news = europe.news,control +topic.weather = #.weather,control +topic.news = #.news,control +topic.europe = europe.#,control +topic.usa = usa.#,control +topic.control = control
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify new file mode 100644 index 0000000000..588a086752 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/pub-sub + +topic_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Listener +} + +topic_publisher_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Publisher +} + +background "can receive messages" topic_listener_java +clients topic_publisher_java +outputs ./topic_publisher_java.out "topic_listener_java.out | remove_uuid | sort" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify.in new file mode 100644 index 0000000000..6e3074e611 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify.in @@ -0,0 +1,95 @@ +==== topic_publisher_java.out +Publisher: Creating a non-transacted, auto-acknowledged session +Publisher: Creating a TestMessage to send to the topics +Publisher: Creating a Message Publisher for topic usa.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic usa.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Closing connection +Publisher: Closing JNDI context +==== topic_listener_java.out | remove_uuid | sort +Listener: Closing connection +Listener: Closing JNDI context +Listener: Creating a Message Subscriber for topic europe +Listener: Creating a Message Subscriber for topic news +Listener: Creating a Message Subscriber for topic usa +Listener: Creating a Message Subscriber for topic weather +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Received message for topic: europe: message 1 +Listener: Received message for topic: europe: message 1 +Listener: Received message for topic: europe: message 2 +Listener: Received message for topic: europe: message 2 +Listener: Received message for topic: europe: message 3 +Listener: Received message for topic: europe: message 3 +Listener: Received message for topic: europe: message 4 +Listener: Received message for topic: europe: message 4 +Listener: Received message for topic: europe: message 5 +Listener: Received message for topic: europe: message 5 +Listener: Received message for topic: europe: message 6 +Listener: Received message for topic: europe: message 6 +Listener: Received message for topic: news: message 1 +Listener: Received message for topic: news: message 1 +Listener: Received message for topic: news: message 2 +Listener: Received message for topic: news: message 2 +Listener: Received message for topic: news: message 3 +Listener: Received message for topic: news: message 3 +Listener: Received message for topic: news: message 4 +Listener: Received message for topic: news: message 4 +Listener: Received message for topic: news: message 5 +Listener: Received message for topic: news: message 5 +Listener: Received message for topic: news: message 6 +Listener: Received message for topic: news: message 6 +Listener: Received message for topic: usa: message 1 +Listener: Received message for topic: usa: message 1 +Listener: Received message for topic: usa: message 2 +Listener: Received message for topic: usa: message 2 +Listener: Received message for topic: usa: message 3 +Listener: Received message for topic: usa: message 3 +Listener: Received message for topic: usa: message 4 +Listener: Received message for topic: usa: message 4 +Listener: Received message for topic: usa: message 5 +Listener: Received message for topic: usa: message 5 +Listener: Received message for topic: usa: message 6 +Listener: Received message for topic: usa: message 6 +Listener: Received message for topic: weather: message 1 +Listener: Received message for topic: weather: message 1 +Listener: Received message for topic: weather: message 2 +Listener: Received message for topic: weather: message 2 +Listener: Received message for topic: weather: message 3 +Listener: Received message for topic: weather: message 3 +Listener: Received message for topic: weather: message 4 +Listener: Received message for topic: weather: message 4 +Listener: Received message for topic: weather: message 5 +Listener: Received message for topic: weather: message 5 +Listener: Received message for topic: weather: message 6 +Listener: Received message for topic: weather: message 6 +Listener: Setting an ExceptionListener on the connection as sample uses a TopicSubscriber +Listener: Shutting down listener for europe +Listener: Shutting down listener for news +Listener: Shutting down listener for usa +Listener: Shutting down listener for weather +Listener: Starting connection so TopicSubscriber can receive messages diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java new file mode 100644 index 0000000000..9276b3e21b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/pub-sub + +topic_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Listener +} + +background "can receive messages" topic_listener_java +clients $cpp/topic_publisher +outputs $cpp/topic_publisher.out "topic_listener_java.out | remove_uuid | sort" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java.in new file mode 100644 index 0000000000..62cc76baec --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_cpp_java.in @@ -0,0 +1,55 @@ +==== topic_publisher.out +==== topic_listener_java.out | remove_uuid | sort +Listener: Closing connection +Listener: Closing JNDI context +Listener: Creating a Message Subscriber for topic europe +Listener: Creating a Message Subscriber for topic news +Listener: Creating a Message Subscriber for topic usa +Listener: Creating a Message Subscriber for topic weather +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Received message for topic: europe: Message 0 +Listener: Received message for topic: europe: Message 0 +Listener: Received message for topic: europe: Message 1 +Listener: Received message for topic: europe: Message 1 +Listener: Received message for topic: europe: Message 2 +Listener: Received message for topic: europe: Message 2 +Listener: Received message for topic: europe: Message 3 +Listener: Received message for topic: europe: Message 3 +Listener: Received message for topic: europe: Message 4 +Listener: Received message for topic: europe: Message 4 +Listener: Received message for topic: news: Message 0 +Listener: Received message for topic: news: Message 0 +Listener: Received message for topic: news: Message 1 +Listener: Received message for topic: news: Message 1 +Listener: Received message for topic: news: Message 2 +Listener: Received message for topic: news: Message 2 +Listener: Received message for topic: news: Message 3 +Listener: Received message for topic: news: Message 3 +Listener: Received message for topic: news: Message 4 +Listener: Received message for topic: news: Message 4 +Listener: Received message for topic: usa: Message 0 +Listener: Received message for topic: usa: Message 0 +Listener: Received message for topic: usa: Message 1 +Listener: Received message for topic: usa: Message 1 +Listener: Received message for topic: usa: Message 2 +Listener: Received message for topic: usa: Message 2 +Listener: Received message for topic: usa: Message 3 +Listener: Received message for topic: usa: Message 3 +Listener: Received message for topic: usa: Message 4 +Listener: Received message for topic: usa: Message 4 +Listener: Received message for topic: weather: Message 0 +Listener: Received message for topic: weather: Message 0 +Listener: Received message for topic: weather: Message 1 +Listener: Received message for topic: weather: Message 1 +Listener: Received message for topic: weather: Message 2 +Listener: Received message for topic: weather: Message 2 +Listener: Received message for topic: weather: Message 3 +Listener: Received message for topic: weather: Message 3 +Listener: Received message for topic: weather: Message 4 +Listener: Received message for topic: weather: Message 4 +Listener: Setting an ExceptionListener on the connection as sample uses a TopicSubscriber +Listener: Shutting down listener for europe +Listener: Shutting down listener for news +Listener: Shutting down listener for usa +Listener: Shutting down listener for weather +Listener: Starting connection so TopicSubscriber can receive messages diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp new file mode 100644 index 0000000000..af22b3b82c --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/pub-sub + +topic_publisher_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Publisher +} + +background "Listening" $cpp/topic_listener +clients topic_publisher_java +outputs ./topic_publisher_java.out "$cpp/topic_listener.out | remove_uuid | sort" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp.in new file mode 100644 index 0000000000..8c5c26eaca --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_cpp.in @@ -0,0 +1,99 @@ +==== topic_publisher_java.out +Publisher: Creating a non-transacted, auto-acknowledged session +Publisher: Creating a TestMessage to send to the topics +Publisher: Creating a Message Publisher for topic usa.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic usa.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Closing connection +Publisher: Closing JNDI context +==== topic_listener.out | remove_uuid | sort +Declaring queue: europe +Declaring queue: news +Declaring queue: usa +Declaring queue: weather +Listening for messages ... +Message: message 1 from europe +Message: message 1 from europe +Message: message 1 from news +Message: message 1 from news +Message: message 1 from usa +Message: message 1 from usa +Message: message 1 from weather +Message: message 1 from weather +Message: message 2 from europe +Message: message 2 from europe +Message: message 2 from news +Message: message 2 from news +Message: message 2 from usa +Message: message 2 from usa +Message: message 2 from weather +Message: message 2 from weather +Message: message 3 from europe +Message: message 3 from europe +Message: message 3 from news +Message: message 3 from news +Message: message 3 from usa +Message: message 3 from usa +Message: message 3 from weather +Message: message 3 from weather +Message: message 4 from europe +Message: message 4 from europe +Message: message 4 from news +Message: message 4 from news +Message: message 4 from usa +Message: message 4 from usa +Message: message 4 from weather +Message: message 4 from weather +Message: message 5 from europe +Message: message 5 from europe +Message: message 5 from news +Message: message 5 from news +Message: message 5 from usa +Message: message 5 from usa +Message: message 5 from weather +Message: message 5 from weather +Message: message 6 from europe +Message: message 6 from europe +Message: message 6 from news +Message: message 6 from news +Message: message 6 from usa +Message: message 6 from usa +Message: message 6 from weather +Message: message 6 from weather +Message: That's all, folks! from europe +Message: That's all, folks! from news +Message: That's all, folks! from usa +Message: That's all, folks! from weather +Shutting down listener for europe +Shutting down listener for news +Shutting down listener for usa +Shutting down listener for weather +Subscribing to queue europe +Subscribing to queue news +Subscribing to queue usa +Subscribing to queue weather diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python new file mode 100644 index 0000000000..3758e0f014 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/pubsub + +topic_publisher_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Publisher +} + +background "Queues created" $py/topic_subscriber.py +clients topic_publisher_java +outputs ./topic_publisher_java.out "$py/topic_subscriber.py.out | remove_uuid | sort" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python.in new file mode 100644 index 0000000000..92184201d0 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_java_python.in @@ -0,0 +1,95 @@ +==== topic_publisher_java.out +Publisher: Creating a non-transacted, auto-acknowledged session +Publisher: Creating a TestMessage to send to the topics +Publisher: Creating a Message Publisher for topic usa.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic usa.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.weather +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Creating a Message Publisher for topic europe.news +Publisher: Sending message 1 +Publisher: Sending message 2 +Publisher: Sending message 3 +Publisher: Sending message 4 +Publisher: Sending message 5 +Publisher: Sending message 6 +Publisher: Closing connection +Publisher: Closing JNDI context +==== topic_subscriber.py.out | remove_uuid | sort +message 1 +message 1 +message 1 +message 1 +message 1 +message 1 +message 1 +message 1 +message 2 +message 2 +message 2 +message 2 +message 2 +message 2 +message 2 +message 2 +message 3 +message 3 +message 3 +message 3 +message 3 +message 3 +message 3 +message 3 +message 4 +message 4 +message 4 +message 4 +message 4 +message 4 +message 4 +message 4 +message 5 +message 5 +message 5 +message 5 +message 5 +message 5 +message 5 +message 5 +message 6 +message 6 +message 6 +message 6 +message 6 +message 6 +message 6 +message 6 +Messages on 'europe' queue: +Messages on 'news' queue: +Messages on 'usa' queue: +Messages on 'weather' queue: +Queues created - please start the topic producer +Subscribing local queue 'local_europe' to europe-' +Subscribing local queue 'local_news' to news-' +Subscribing local queue 'local_usa' to usa-' +Subscribing local queue 'local_weather' to weather-' +That's all, folks! +That's all, folks! +That's all, folks! +That's all, folks! diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java new file mode 100644 index 0000000000..c2b516f376 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java @@ -0,0 +1,10 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/pubsub + +topic_listener_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.pubsub.Listener +} + +background "can receive messages" topic_listener_java +clients $py/topic_publisher.py +outputs $py/topic_publisher.py.out "topic_listener_java.out | remove_uuid | sort" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java.in new file mode 100644 index 0000000000..68b96cba2b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/verify_python_java.in @@ -0,0 +1,55 @@ +==== topic_publisher.py.out +==== topic_listener_java.out | remove_uuid | sort +Listener: Closing connection +Listener: Closing JNDI context +Listener: Creating a Message Subscriber for topic europe +Listener: Creating a Message Subscriber for topic news +Listener: Creating a Message Subscriber for topic usa +Listener: Creating a Message Subscriber for topic weather +Listener: Creating a non-transacted, auto-acknowledged session +Listener: Received message for topic: europe: europe.news 0 +Listener: Received message for topic: europe: europe.news 1 +Listener: Received message for topic: europe: europe.news 2 +Listener: Received message for topic: europe: europe.news 3 +Listener: Received message for topic: europe: europe.news 4 +Listener: Received message for topic: europe: europe.weather 0 +Listener: Received message for topic: europe: europe.weather 1 +Listener: Received message for topic: europe: europe.weather 2 +Listener: Received message for topic: europe: europe.weather 3 +Listener: Received message for topic: europe: europe.weather 4 +Listener: Received message for topic: news: europe.news 0 +Listener: Received message for topic: news: europe.news 1 +Listener: Received message for topic: news: europe.news 2 +Listener: Received message for topic: news: europe.news 3 +Listener: Received message for topic: news: europe.news 4 +Listener: Received message for topic: news: usa.news 0 +Listener: Received message for topic: news: usa.news 1 +Listener: Received message for topic: news: usa.news 2 +Listener: Received message for topic: news: usa.news 3 +Listener: Received message for topic: news: usa.news 4 +Listener: Received message for topic: usa: usa.news 0 +Listener: Received message for topic: usa: usa.news 1 +Listener: Received message for topic: usa: usa.news 2 +Listener: Received message for topic: usa: usa.news 3 +Listener: Received message for topic: usa: usa.news 4 +Listener: Received message for topic: usa: usa.weather 0 +Listener: Received message for topic: usa: usa.weather 1 +Listener: Received message for topic: usa: usa.weather 2 +Listener: Received message for topic: usa: usa.weather 3 +Listener: Received message for topic: usa: usa.weather 4 +Listener: Received message for topic: weather: europe.weather 0 +Listener: Received message for topic: weather: europe.weather 1 +Listener: Received message for topic: weather: europe.weather 2 +Listener: Received message for topic: weather: europe.weather 3 +Listener: Received message for topic: weather: europe.weather 4 +Listener: Received message for topic: weather: usa.weather 0 +Listener: Received message for topic: weather: usa.weather 1 +Listener: Received message for topic: weather: usa.weather 2 +Listener: Received message for topic: weather: usa.weather 3 +Listener: Received message for topic: weather: usa.weather 4 +Listener: Setting an ExceptionListener on the connection as sample uses a TopicSubscriber +Listener: Shutting down listener for europe +Listener: Shutting down listener for news +Listener: Shutting down listener for usa +Listener: Shutting down listener for weather +Listener: Starting connection so TopicSubscriber can receive messages diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java new file mode 100644 index 0000000000..0589a3801b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Client.java @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.requestResponse; + + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; + +/** + * This example illustrates the use of the JMS utility class <code>QueueRequestor</code> + * which provides a synchronous RPC-like abstraction using temporary destinations + * to deliver responses back to the client. + */ +public class Client +{ + /* Used in log output. */ + private static final String CLASS="Client"; + + + /** + * Run the message requestor example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Client requestor=new Client(); + requestor.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("requestResponse.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac = (ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + + // create the connection + QueueConnection connection = (QueueConnection) conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + // Lookup the destination + Queue destination = (Queue) ctx.lookup("requestQueue"); + + // Create a QueueRequestor + System.out.println(CLASS + ": Creating a QueueRequestor"); + + QueueRequestor requestor = new QueueRequestor(session, destination); + + // Now start the connection + System.out.println(CLASS + ": Starting connection"); + connection.start(); + + // Create a message to send as a request for service + TextMessage request; + + // Send some messages to the server's request queue + String[] messages = {"Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."}; + + // Get the number of times that this sample should request service + for (String message : messages) + { + request = session.createTextMessage(message); + sendReceive(request, requestor); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + private void sendReceive(TextMessage request, QueueRequestor requestor) throws JMSException + { + Message response; + response=requestor.request(request); + System.out.println(CLASS + ": \tRequest Content= " + request.getText()); + // Print out the details of the response received + String text; + if (response instanceof TextMessage) + { + text=((TextMessage) response).getText(); + } + else + { + byte[] body=new byte[(int) ((BytesMessage) response).getBodyLength()]; + ((BytesMessage) response).readBytes(body); + text=new String(body); + } + System.out.println(CLASS + ": \tResponse Content= " + text); + } +} + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java new file mode 100644 index 0000000000..2ac349a879 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/Server.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.requestResponse; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; + +/** + * The example creates a MessageConsumer on the specified + * Destination which is used to synchronously consume messages. If a + * received message has a ReplyTo header then a new response message is sent + * to that specified destination. + */ +public class Server +{ + /* Used in log output. */ + private static final String CLASS="Server"; + + + /** + * Run the message mirror example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Server server=new Server(); + server.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("requestResponse.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + + // create the connection + Connection connection=conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + + Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Lookup the destination + Queue destination = (Queue) ctx.lookup("requestQueue"); + + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + MessageConsumer messageConsumer=session.createConsumer(destination); + + /** + * Create a MessageProducer + */ + System.out.println(CLASS + ": Creating a MessageProducer"); + MessageProducer messageProducer; + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Cycle round until all the messages are consumed. + Message requestMessage; + TextMessage responseMessage; + boolean end=false; + while (!end) + { + System.out.println(CLASS + ": Receiving the message"); + + requestMessage=messageConsumer.receive(); + + String text; + if (requestMessage instanceof TextMessage) + { + text=((TextMessage) requestMessage).getText(); + } + else + { + byte[] body=new byte[(int) ((BytesMessage) requestMessage).getBodyLength()]; + ((BytesMessage) requestMessage).readBytes(body); + text=new String(body); + } + + // Now bounce the message if a ReplyTo header was set. + if (requestMessage.getJMSReplyTo() != null) + { + System.out.println(CLASS + ": Activating response queue listener"); + responseMessage=session.createTextMessage(); + + responseMessage.setText(text.toUpperCase()); + System.out.println(CLASS + ": \tResponse = " + responseMessage.getText()); + + messageProducer=session.createProducer(requestMessage.getJMSReplyTo()); + messageProducer.send(responseMessage); + } + System.out.println(); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties new file mode 100644 index 0000000000..c467a4f123 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/requestResponse.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.requestQueue = request diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify new file mode 100644 index 0000000000..79f22aa88a --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/pub-sub + +client_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Client +} + +server_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server +} + +background "can receive messages" server_java +clients client_java +kill %% +outputs "client_java.out | remove_uuid" "server_java.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify.in new file mode 100644 index 0000000000..f2c244dea6 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify.in @@ -0,0 +1,38 @@ +==== client_java.out | remove_uuid +Client: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Client: Creating a non-transacted, auto-acknowledged session +Client: Creating a QueueRequestor +Client: Starting connection +Client: Request Content= Twas brillig, and the slithy toves +Client: Response Content= TWAS BRILLIG, AND THE SLITHY TOVES +Client: Request Content= Did gire and gymble in the wabe. +Client: Response Content= DID GIRE AND GYMBLE IN THE WABE. +Client: Request Content= All mimsy were the borogroves, +Client: Response Content= ALL MIMSY WERE THE BOROGROVES, +Client: Request Content= And the mome raths outgrabe. +Client: Response Content= AND THE MOME RATHS OUTGRABE. +Client: Closing connection +Client: Closing JNDI context +==== server_java.out | remove_uuid +Server: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Server: Creating a non-transacted, auto-acknowledged session +Server: Creating a MessageConsumer +Server: Creating a MessageProducer +Server: Starting connection so MessageConsumer can receive messages +Server: Receiving the message +Server: Activating response queue listener +Server: Response = TWAS BRILLIG, AND THE SLITHY TOVES + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = DID GIRE AND GYMBLE IN THE WABE. + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = ALL MIMSY WERE THE BOROGROVES, + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = AND THE MOME RATHS OUTGRABE. + +Server: Receiving the message diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java new file mode 100644 index 0000000000..6ef1b3b7e3 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java @@ -0,0 +1,12 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/request-response + +client_java() +{ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Client +} + +background "Waiting" $cpp/server +clients client_java +kill %% +outputs "client_java.out | remove_uuid" "$cpp/server.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java.in new file mode 100644 index 0000000000..4b7e7e0741 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_cpp_java.in @@ -0,0 +1,22 @@ +==== client_java.out | remove_uuid +Client: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Client: Creating a non-transacted, auto-acknowledged session +Client: Creating a QueueRequestor +Client: Starting connection +Client: Request Content= Twas brillig, and the slithy toves +Client: Response Content= TWAS BRILLIG, AND THE SLITHY TOVES +Client: Request Content= Did gire and gymble in the wabe. +Client: Response Content= DID GIRE AND GYMBLE IN THE WABE. +Client: Request Content= All mimsy were the borogroves, +Client: Response Content= ALL MIMSY WERE THE BOROGROVES, +Client: Request Content= And the mome raths outgrabe. +Client: Response Content= AND THE MOME RATHS OUTGRABE. +Client: Closing connection +Client: Closing JNDI context +==== server.out | remove_uuid +Activating request queue listener for: request +Waiting for requests +Request: Twas brillig, and the slithy toves (TempQueue) +Request: Did gire and gymble in the wabe. (TempQueue) +Request: All mimsy were the borogroves, (TempQueue) +Request: And the mome raths outgrabe. (TempQueue) diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp new file mode 100644 index 0000000000..a1c5aa325d --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp @@ -0,0 +1,12 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +cpp=$CPP/request-response + +server_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server +} + +background "can receive messages" server_java +clients $cpp/client +#ps -ao pid,cmd | awk '/qpid-client-<version>.jar/{ print $1 }' | xargs -r kill +kill %% +outputs "$cpp/client.out | remove_uuid" "server_java.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp.in new file mode 100644 index 0000000000..9cccf39393 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_cpp.in @@ -0,0 +1,35 @@ +==== client.out | remove_uuid +Activating response queue listener for: client +Request: Twas brillig, and the slithy toves +Request: Did gire and gymble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Waiting for all responses to arrive ... +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GIRE AND GYMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +Shutting down listener for client +==== server_java.out | remove_uuid +Server: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Server: Creating a non-transacted, auto-acknowledged session +Server: Creating a MessageConsumer +Server: Creating a MessageProducer +Server: Starting connection so MessageConsumer can receive messages +Server: Receiving the message +Server: Activating response queue listener +Server: Response = TWAS BRILLIG, AND THE SLITHY TOVES + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = DID GIRE AND GYMBLE IN THE WABE. + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = ALL MIMSY WERE THE BOROGROVES, + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = AND THE MOME RATHS OUTGRABE. + +Server: Receiving the message diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python new file mode 100644 index 0000000000..0760952527 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python @@ -0,0 +1,11 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/request-response + +server_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Server +} + +background "can receive messages" server_java +clients $py/client.py +kill %% +outputs "$py/client.py.out | remove_uuid" "server_java.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python.in new file mode 100644 index 0000000000..bffe9d2842 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_java_python.in @@ -0,0 +1,34 @@ +==== client.py.out | remove_uuid +Request: Twas brillig, and the slithy toves +Request: Did gyre and gimble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Messages on queue: reply_to: +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GYRE AND GIMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +No more messages! +==== server_java.out | remove_uuid +Server: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Server: Creating a non-transacted, auto-acknowledged session +Server: Creating a MessageConsumer +Server: Creating a MessageProducer +Server: Starting connection so MessageConsumer can receive messages +Server: Receiving the message +Server: Activating response queue listener +Server: Response = TWAS BRILLIG, AND THE SLITHY TOVES + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = DID GYRE AND GIMBLE IN THE WABE. + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = ALL MIMSY WERE THE BOROGROVES, + +Server: Receiving the message +Server: Activating response queue listener +Server: Response = AND THE MOME RATHS OUTGRABE. + +Server: Receiving the message diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java new file mode 100644 index 0000000000..6ea526e914 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java @@ -0,0 +1,11 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/request-response + +client_java(){ +java -Dlog4j.configuration=file://"$JAVA"/log4j.xml -cp "$CLASSPATH" org.apache.qpid.example.jmsexample.requestResponse.Client +} + +background "Request server running" $py/server.py +clients client_java +kill %% +outputs "client_java.out | remove_uuid" "$py/server.py.out | remove_uuid" diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java.in b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java.in new file mode 100644 index 0000000000..6e53ca3281 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/verify_python_java.in @@ -0,0 +1,18 @@ +==== client_java.out | remove_uuid +Client: Setting an ExceptionListener on the connection as sample uses a MessageConsumer +Client: Creating a non-transacted, auto-acknowledged session +Client: Creating a QueueRequestor +Client: Starting connection +Client: Request Content= Twas brillig, and the slithy toves +Client: Response Content= TWAS BRILLIG, AND THE SLITHY TOVES +Client: Request Content= Did gire and gymble in the wabe. +Client: Response Content= DID GIRE AND GYMBLE IN THE WABE. +Client: Request Content= All mimsy were the borogroves, +Client: Response Content= ALL MIMSY WERE THE BOROGROVES, +Client: Request Content= And the mome raths outgrabe. +Client: Response Content= AND THE MOME RATHS OUTGRABE. +Client: Closing connection +Client: Closing JNDI context +==== server.py.out | remove_uuid +Request server running - run your client now. +(Times out after 100 seconds ...) diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java new file mode 100644 index 0000000000..f3bf9f8686 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java @@ -0,0 +1,259 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.jmsexample.transacted; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Properties; + +/** + * Transactional message example sends a number of messages to a Queue + * and then uses a transacted session to move them from the Queue to a Topic. + * <p/> + * <p>The program completes the following steps: + * <ul> + * <li>Publish the specified number of messages to the queue.</li> + * <li>Within a transacted session consume all messages from the queue + * and publish them to the topic.</li> + * <li>By default commit the transacted session, unless the "<code>-rollback true</code>" + * option is specified in which case roll it back.</li> + * <li>Check for outstanding messages on the queue.</li> + * <li>Check for outstanding messages on the topic.</li> + * </ul> + * <p/> + */ +public class QueueToTopic +{ + /* Used in log output. */ + private static final String CLASS="QueueToTopic"; + + + /* Specify if the transaction is committed */ + private boolean _commit; + + /** + * Create a QueueToTopic client. + * + * @param commit Specifies if the transaction should be committed. + */ + public QueueToTopic(boolean commit) + { + _commit=commit; + } + + /** + * Run the message mover example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + boolean commit=true; + if (args.length > 1) + { + if (args[0].equalsIgnoreCase("-rollback")) + { + commit=!Boolean.getBoolean(args[1]); + } + } + QueueToTopic mover=new QueueToTopic(commit); + mover.runTest(); + } + + private void runTest() + { + try + { + // Load JNDI properties + Properties properties=new Properties(); + properties.load(this.getClass().getResourceAsStream("transacted.properties")); + + //Create the initial context + Context ctx=new InitialContext(properties); + + // Lookup the connection factory + ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); + // create the connection + Connection connection=conFac.createConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Start the connection + connection.start(); + + /** + * Create nonTransactedSession. This non-transacted auto-ack session is used to create the MessageProducer + * that is used to populate the queue and the MessageConsumer that is used to consume the messages + * from the topic. + */ + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session nonTransactedSession=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Lookup the queue + Queue queue=(Queue) ctx.lookup("transactedQueue"); + + // Lookup the topic + Topic topic=(Topic) ctx.lookup("transactedTopic"); + + // Make sure that the queue is empty + System.out.print(CLASS + ": Purging messages from queue..."); + MessageConsumer queueMessageConsumer=nonTransactedSession.createConsumer(queue); + Message purgedMessage; + int numberPurged=-1; + do + { + purgedMessage=queueMessageConsumer.receiveNoWait(); + numberPurged++; + } + while (purgedMessage != null); + System.out.println(numberPurged + " message(s) purged."); + + // Create the MessageProducer for the queue + System.out.println(CLASS + ": Creating a MessageProducer for the queue"); + MessageProducer messageProducer=nonTransactedSession.createProducer(queue); + + // Now create the MessageConsumer for the topic + System.out.println(CLASS + ": Creating a MessageConsumer for the topic"); + MessageConsumer topicMessageConsumer=nonTransactedSession.createConsumer(topic); + + // Create a textMessage. We're using a TextMessage for this example. + System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); + TextMessage textMessage=nonTransactedSession.createTextMessage("Sample text message"); + + // Loop to publish the requested number of messages to the queue. + for (int i=1; i <= 5; i++) + { + messageProducer + .send(textMessage, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, + Message.DEFAULT_TIME_TO_LIVE); + + // Print out details of textMessage just sent + System.out.println(CLASS + ": Message sent: " + i + " " + textMessage.getJMSMessageID()); + } + + // Create a new transacted Session to move the messages from the queue to the topic + Session transactedSession=connection.createSession(true, Session.SESSION_TRANSACTED); + + // Create a new message consumer from the queue + MessageConsumer transactedConsumer=transactedSession.createConsumer(queue); + + // Create a new message producer for the topic + MessageProducer transactedProducer=transactedSession.createProducer(topic); + + // Loop to consume the messages from the queue and publish them to the topic + Message receivedMessage; + for (int i=1; i <= 5; i++) + { + // Receive a message + receivedMessage=transactedConsumer.receive(); + System.out.println(CLASS + ": Moving message: " + i + " " + receivedMessage.getJMSMessageID()); + // Publish it to the topic + transactedProducer.send(receivedMessage); + } + + // Either commit or rollback the transacted session based on the command line args. + if (_commit) + { + System.out.println(CLASS + ": Committing transacted session."); + transactedSession.commit(); + } + else + { + System.out.println(CLASS + ": Rolling back transacted session."); + transactedSession.rollback(); + } + + // Now consume any outstanding messages on the queue + System.out.print(CLASS + ": Mopping up messages from queue"); + if (_commit) + { + System.out.print(" (expecting none)..."); + } + else + { + System.out.print(" (expecting " + 5 + ")..."); + } + + Message moppedMessage; + int numberMopped=0; + do + { + moppedMessage=queueMessageConsumer.receiveNoWait(); + if (moppedMessage != null) + { + numberMopped++; + } + } + while (moppedMessage != null); + System.out.println(numberMopped + " message(s) mopped."); + + // Now consume any outstanding messages for the topic subscriber + System.out.print(CLASS + ": Mopping up messages from topic"); + + if (_commit) + { + System.out.print(" (expecting " + 5 + ")..."); + } + else + { + System.out.print(" (expecting none)..."); + } + + numberMopped=0; + do + { + moppedMessage=topicMessageConsumer.receiveNoWait(); + if (moppedMessage != null) + { + numberMopped++; + } + } + while (moppedMessage != null); + System.out.println(numberMopped + " message(s) mopped."); + + // Close the QueueConnection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + ctx.close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties new file mode 100644 index 0000000000..d93d19eea0 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/transacted.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.transactedQueue = transactedQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.transactedTopic = transactedTopic
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java new file mode 100644 index 0000000000..1849f733e9 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java @@ -0,0 +1,163 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.publisher; + +import java.io.File; + +import javax.jms.JMSException; + + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +/** + * Class that sends message files to the Publisher to distribute + * using files as input + * Must set properties for host in properties file or uses in vm broker + */ +public class FileMessageDispatcher +{ + + protected static final Logger _logger = LoggerFactory.getLogger(FileMessageDispatcher.class); + + protected static Publisher _publisher = null; + + /** + * To use this main method you need to specify a path or file to use for input + * This class then uses file contents from the dir/file specified to generate + * messages to publish + * Intended to be a very simple way to get going with publishing using the broker + * @param args - must specify one value, the path to file(s) for publisher + */ + public static void main(String[] args) + { + + // Check command line args ok - must provide a path or file for us to dispatch + if (args.length == 0) + { + System.out.println("Usage: FileMessageDispatcher <filesToDispatch>" + ""); + } + else + { + try + { + // publish message(s) from file(s) to configured queue + publish(args[0]); + + // Move payload file(s) to archive location as no error + FileUtils.moveFileToNewDir(args[0], System.getProperties().getProperty(Statics.ARCHIVE_PATH)); + } + catch (Exception e) + { + // log error and exit + _logger.error("Error trying to dispatch message: " + e); + System.exit(1); + } + finally + { + // clean up before exiting + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Finished dispatching message"); + } + + System.exit(0); + } + + /** + * Publish the content of a file or files from a directory as messages + * @param path - from main args + * @throws JMSException + * @throws MessageFactoryException - if cannot create message from file content + */ + public static void publish(String path) throws JMSException, MessageFactoryException + { + File tempFile = new File(path); + if (tempFile.isDirectory()) + { + // while more files in dir publish them + File[] files = tempFile.listFiles(); + + if ((files == null) || (files.length == 0)) + { + _logger.info("FileMessageDispatcher - No files to publish in input directory: " + tempFile); + } + else + { + for (File file : files) + { + // Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), file.toString()); + + // Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + + } + } + } + else + { + // handle a single file + // Create message factory passing in payload path + FileMessageFactory factory = new FileMessageFactory(getPublisher().getSession(), tempFile.toString()); + + // Send the message generated from the payload using the _publisher + getPublisher().sendMessage(factory.createEventMessage()); + } + } + + /** + * Cleanup before exit + */ + public static void cleanup() + { + if (getPublisher() != null) + { + getPublisher().cleanup(); + } + } + + /** + * @return A Publisher instance + */ + private static Publisher getPublisher() + { + if (_publisher != null) + { + return _publisher; + } + + // Create a _publisher + _publisher = new Publisher(); + + return _publisher; + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java new file mode 100644 index 0000000000..1240284a56 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.example.publisher; + +import org.apache.qpid.example.shared.FileUtils; +import org.apache.qpid.example.shared.Statics; + +import java.io.*; +import javax.jms.*; + +public class FileMessageFactory +{ + protected final Session _session; + protected final String _payload; + protected final String _filename; + + /** + * Contructs and instance using a filename from which content will be used to create message + * @param session + * @param filename + * @throws MessageFactoryException + */ + public FileMessageFactory(Session session, String filename) throws MessageFactoryException + { + try + { + _filename = filename; + _payload = FileUtils.getFileContent(filename); + _session = session; + } + catch (IOException e) + { + MessageFactoryException mfe = new MessageFactoryException(e.toString(), e); + throw mfe; + } + } + + /** + * Creates a text message and sets filename property on it + * The filename property is purely intended to provide visibility + * of file content passing trhough the broker using example classes + * @return Message - a TextMessage with content from file + * @throws JMSException + */ + public Message createEventMessage() throws JMSException + { + TextMessage msg = _session.createTextMessage(); + msg.setText(_payload); + msg.setStringProperty(Statics.FILENAME_PROPERTY, new File(_filename).getName()); + + return msg; + } + + /** + * Creates message from a string for use by the monitor + * @param session + * @param textMsg - message content + * @return Message - TextMessage with content from String + * @throws JMSException + */ + public static Message createSimpleEventMessage(Session session, String textMsg) throws JMSException + { + TextMessage msg = session.createTextMessage(); + msg.setText(textMsg); + + return msg; + } + + public Message createShutdownMessage() throws JMSException + { + return _session.createTextMessage("SHUTDOWN"); + } + + public Message createReportRequestMessage() throws JMSException + { + return _session.createTextMessage("REPORT"); + } + + public Message createReportResponseMessage(String msg) throws JMSException + { + return _session.createTextMessage(msg); + } + + public boolean isShutdown(Message m) + { + return checkText(m, "SHUTDOWN"); + } + + public boolean isReport(Message m) + { + return checkText(m, "REPORT"); + } + + public Object getReport(Message m) + { + try + { + return ((TextMessage) m).getText(); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return e.toString(); + } + } + + private static boolean checkText(Message m, String s) + { + try + { + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return false; + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java new file mode 100644 index 0000000000..d709da6432 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.publisher; + +public class MessageFactoryException extends Exception +{ + public MessageFactoryException(String msg, Throwable t) + { + super(msg, t); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java new file mode 100644 index 0000000000..3d16e01af4 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; + +/** + * Class that sends heartbeat messages to allow monitoring of message consumption Sends regular (currently 20 seconds + * apart) heartbeat message + */ +public class MonitorMessageDispatcher +{ + + private static final Logger _logger = LoggerFactory.getLogger(MonitorMessageDispatcher.class); + + protected static MonitorPublisher _monitorPublisher = null; + + protected static final String DEFAULT_MONITOR_PUB_NAME = "MonitorPublisher"; + + /** + * Easy entry point for running a message dispatcher for monitoring consumption + * Sends 1000 messages with no delay + * + * @param args + */ + public static void main(String[] args) + { + //Switch on logging appropriately for your app + try + { + int i =0; + while (i < 1000) + { + try + { + //endlessly publish messages to monitor queue + publish(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Dispatched monitor message"); + } + + //sleep for twenty seconds and then publish again - change if appropriate + //Thread.sleep(1000); + i++ ; + } + catch (UndeliveredMessageException a) + { + //trigger application specific failure handling here + _logger.error("Problem delivering monitor message"); + break; + } + } + } + catch (Exception e) + { + _logger.error("Error trying to dispatch AMS monitor message: " + e); + System.exit(1); + } + finally + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + System.exit(1); + } + + /** + * Publish heartbeat message + * + * @throws JMSException + * @throws UndeliveredMessageException + */ + public static void publish() throws JMSException, UndeliveredMessageException + { + //Send the message generated from the payload using the _publisher +// getMonitorPublisher().sendImmediateMessage +// (FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(),"monitor:" +System.currentTimeMillis())); + + getMonitorPublisher().sendMessage + (getMonitorPublisher()._session, + FileMessageFactory.createSimpleEventMessage(getMonitorPublisher().getSession(), "monitor:" + System.currentTimeMillis()), + DeliveryMode.PERSISTENT, false, true); + + } + + /** Cleanup publishers */ + public static void cleanup() + { + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + + if (getMonitorPublisher() != null) + { + getMonitorPublisher().cleanup(); + } + } + + //Returns a _publisher for the monitor queue + private static MonitorPublisher getMonitorPublisher() + { + if (_monitorPublisher != null) + { + return _monitorPublisher; + } + + //Create a _publisher using failover details and constant for monitor queue + _monitorPublisher = new MonitorPublisher(); + + _monitorPublisher.setName(MonitorMessageDispatcher.DEFAULT_MONITOR_PUB_NAME); + return _monitorPublisher; + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java new file mode 100644 index 0000000000..750f57d9dc --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.qpid.client.BasicMessageProducer; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +/** + * Subclass of Publisher which uses QPID functionality to send a heartbeat message Note immediate flag not available via + * JMS MessageProducer + */ +public class MonitorPublisher extends Publisher +{ + + private static final Logger _log = LoggerFactory.getLogger(Publisher.class); + + BasicMessageProducer _producer; + + public MonitorPublisher() + { + super(); + } + + /* + * Publishes a message using given details + */ + public boolean sendMessage(Session session, Message message, int deliveryMode, + boolean immediate, boolean commit) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) session.createProducer(_destination); + + _producer.send(message, deliveryMode, immediate); + + if (commit) + { + //commit the message send and close the transaction + _session.commit(); + } + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error("JMSException", e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /* + * Publishes a non-persistent message using transacted session + */ + public boolean sendImmediateMessage(Message message) throws UndeliveredMessageException + { + try + { + _producer = (BasicMessageProducer) _session.createProducer(_destination); + + //Send message via our producer which is not persistent and is immediate + //NB: not available via jms interface MessageProducer + _producer.send(message, DeliveryMode.NON_PERSISTENT, true); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed but do not rollback here as channel closed + _log.error("JMSException", e); + e.printStackTrace(); + throw new UndeliveredMessageException("Cannot deliver immediate message", e); + } + + _log.info(_name + " finished sending message: " + message); + return true; + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java new file mode 100644 index 0000000000..87fc543dbe --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.publisher; + +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.DeliveryMode; +import javax.jms.Queue; +import javax.jms.MessageProducer; +import javax.jms.Connection; +import javax.jms.Session; + +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +public class Publisher +{ + private static final Logger _log = LoggerFactory.getLogger(Publisher.class); + + protected InitialContextHelper _contextHelper; + + protected Connection _connection; + + protected Session _session; + + protected MessageProducer _producer; + + protected String _destinationDir; + + protected String _name = "Publisher"; + + protected Queue _destination; + + protected static final String _defaultDestinationDir = "/tmp"; + + /** + * Creates a Publisher instance using properties from example.properties + * See InitialContextHelper for details of how context etc created + */ + public Publisher() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local"); + _connection = cf.createConnection(); + + //create a transactional session + _session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //lookup the example queue and use it + //Queue is non-exclusive and not deleted when last consumer detaches + _destination = (Queue) ctx.lookup("MyQueue"); + + //create a message producer + _producer = _session.createProducer(_destination); + + //set destination dir for files that have been processed + _destinationDir = _defaultDestinationDir; + + _connection.start(); + } + catch (Exception e) + { + e.printStackTrace(); + _log.error("Exception", e); + } + } + + /** + * Publishes a non-persistent message using transacted session + * Note that persistent is the default mode for send - so need to specify for transient + */ + public boolean sendMessage(Message message) + { + try + { + //Send message via our producer which is not persistent + _producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive()); + + //commit the message send and close the transaction + _session.commit(); + + } + catch (JMSException e) + { + //Have to assume our commit failed and rollback here + try + { + _session.rollback(); + _log.error("JMSException", e); + e.printStackTrace(); + return false; + } + catch (JMSException j) + { + _log.error("Unable to rollback publish transaction ",e); + return false; + } + } + + _log.info(_name + " finished sending message: " + message); + return true; + } + + /** + * Cleanup resources before exit + */ + public void cleanup() + { + try + { + if (_connection != null) + { + _connection.stop(); + _connection.close(); + } + _connection = null; + _producer = null; + } + catch(Exception e) + { + _log.error("Error trying to cleanup publisher " + e); + System.exit(1); + } + } + + /** + * Exposes session + * @return Session + */ + public Session getSession() + { + return _session; + } + + public String getDestinationDir() + { + return _destinationDir; + } + + public void setDestinationDir(String destinationDir) + { + _destinationDir = destinationDir; + } + + public String getName() + { + return _name; + } + + public void setName(String _name) { + this._name = _name; + } +} + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java new file mode 100644 index 0000000000..245008b68a --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.publisher; + +/** + * Exception thrown by monitor when cannot send a message marked for immediate delivery + */ +public class UndeliveredMessageException extends Exception +{ + public UndeliveredMessageException(String msg, Throwable t) + { + super(msg, t); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java new file mode 100644 index 0000000000..e32ee0ba73 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Client.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +/** + * An abstract base class that wraps up the creation of a JMS client utilising JNDI + */ +public abstract class Client +{ + protected ConnectionSetup _setup; + + protected Connection _connection; + protected Destination _destination; + protected Session _session; + + public Client(String destination) + { + if (destination == null) + { + destination = ConnectionSetup.TOPIC_JNDI_NAME; + } + + try + { + _setup = new ConnectionSetup(); + } + catch (NamingException e) + { + //ignore + } + + if (_setup != null) + { + try + { + _connection = _setup.getConnectionFactory().createConnection(); + _destination = _setup.getDestination(destination); + } + catch (JMSException e) + { + System.err.println(e.getMessage()); + } + } + } + + public abstract void start(); + +}
\ No newline at end of file diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java new file mode 100644 index 0000000000..c4edd9034f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/ConnectionSetup.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; + +/** + * This ConnectionSetup is a wrapper around JNDI it creates a number of entries. + * + * It is equivalent to a PropertyFile of value: + * + * connectionfactory.local=amqp://guest:guest@clientid/test?brokerlist='localhost' + * connectionfactory.vm=amqp://guest:guest@clientid/test?brokerlist='vm://:1' + * + * queue.queue=example.MyQueue + * topic.topic=example.hierarical.topic + * + */ +public class ConnectionSetup +{ + final static String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final static String CONNECTION_JNDI_NAME = "local"; + final static String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='localhost'"; + + public static final String QUEUE_JNDI_NAME = "queue"; + final static String QUEUE_NAME = "example.MyQueue"; + + public static final String TOPIC_JNDI_NAME = "topic"; + final static String TOPIC_NAME = "example.hierarical.topic"; + + private Context _ctx; + + public ConnectionSetup() throws NamingException + { + + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("connectionfactory." + "vm", "amqp://guest:guest@clientid/test?brokerlist='vm://:1'"); + + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + properties.put("topic." + TOPIC_JNDI_NAME, TOPIC_NAME); + // Create the initial context + _ctx = new InitialContext(properties); + + } + + public ConnectionSetup(Properties properties) throws NamingException + { + _ctx = new InitialContext(properties); + } + + public ConnectionFactory getConnectionFactory() + { + + // Perform the lookups + try + { + return (ConnectionFactory) _ctx.lookup(CONNECTION_JNDI_NAME); + } + catch (NamingException e) + { + //ignore + } + return null; + } + + public Destination getDestination(String jndiName) + { + // Perform the lookups + try + { + return (Destination) _ctx.lookup(jndiName); + } + catch (ClassCastException cce) + { + //ignore + } + catch (NamingException ne) + { + //ignore + } + return null; + } + + + public void close() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + //ignore + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java new file mode 100644 index 0000000000..dd936e429f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Publisher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * A simple Publisher example. + * + * The class can take two arguments. + * java Publisher <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + * + */ +public class Publisher extends Client +{ + int _msgCount; + + public Publisher(String destination, int msgCount) + { + super(destination); + _msgCount = msgCount; + } + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer _producer = _session.createProducer(_destination); + + for (int msgCount = 0; msgCount < _msgCount; msgCount++) + { + _producer.send(_session.createTextMessage("msg:" + msgCount)); + System.out.println("Sent:" + msgCount); + } + + System.out.println("Done."); + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + + public static void main(String[] args) + { + + String destination = args.length > 2 ? args[1] : null; + + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Publisher(destination, msgCount).start(); + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java new file mode 100644 index 0000000000..f2d736701f --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/pubsub/Subscriber.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.example.pubsub; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; + + +/** + * Simple client that listens for the specified number of msgs on the given Destinaton + * + * The class can take two arguments. + * java Subscriber <destination> <msgCount> + * Where: + * destination is either 'topic' or 'queue' (Default: topic) + * msgCount is the number of messages to send (Default : 100) + */ +public class Subscriber extends Client implements MessageListener +{ + + CountDownLatch _count; + + public Subscriber(String destination, int msgCount) + { + super(destination); + _count = new CountDownLatch(msgCount); + } + + + public void start() + { + try + { + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _session.createDurableSubscriber((Topic) _setup.getDestination(ConnectionSetup.TOPIC_JNDI_NAME), + "exampleClient").setMessageListener(this); + _connection.start(); + _count.await(); + + System.out.println("Done"); + + _connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + + public static void main(String[] args) + { + String destination = args.length > 2 ? args[1] : null; + int msgCount = args.length > 2 ? Integer.parseInt(args[2]) : 100; + + new Subscriber(destination, msgCount).start(); + } + + public void onMessage(Message message) + { + try + { + _count.countDown(); + System.out.println("Received msg:" + ((TextMessage) message).getText()); + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java new file mode 100644 index 0000000000..1a3d596a24 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +public class ConnectionException extends Exception +{ + public ConnectionException(String msg, Throwable t) + { + super(msg, t); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java new file mode 100644 index 0000000000..2987a9559b --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +public class ContextException extends Exception +{ + public ContextException(String msg, Throwable t) + { + super(msg, t); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java new file mode 100644 index 0000000000..54446cb6a7 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +import java.io.*; + +/** + * Class that provides file related utility methods for utility use + */ +public class FileUtils { + + + //Reads file content into String + public static String getFileContent(String filePath) throws IOException + { + + BufferedReader reader = null; + String tempData = ""; + String eol = "\n\r"; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + if (!tempData.equals("")) + { + tempData = tempData + eol + line; + } + else + { + tempData = line; + } + } + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempData; + } + + /* + * Reads xml from a file and returns it as an array of chars + */ + public static char[] getFileAsCharArray(String filePath) throws IOException + { + BufferedReader reader = null; + char[] tempChars = null; + String tempData = ""; + + try + { + String line; + reader = new BufferedReader(new FileReader(filePath)); + while ((line = reader.readLine()) != null) + { + tempData = tempData + line; + } + tempChars = tempData.toCharArray(); + } + finally + { + if (reader != null) + { + reader.close(); + } + } + return tempChars; + } + + /* + * Write String content to filename provided + */ + public static void writeStringToFile(String content, String path) throws IOException + { + + BufferedWriter writer = new BufferedWriter(new FileWriter(new File(path))); + writer.write(content); + writer.flush(); + writer.close(); + } + + /* + * Allows moving of files to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(String path, String newDir) throws IOException + { + //get file name from current path + //while more files in dir publish them + File pathFile = new File(path); + if (pathFile.isDirectory()) + { + File[] files = pathFile.listFiles(); + for (File file : files) + { + moveFileToNewDir(file,newDir); + } + } + } + + /* + * Allows moving of a file to a new dir and preserves the last bit of the name only + */ + public static void moveFileToNewDir(File fileToMove, String newDir) throws IOException + { + moveFile(fileToMove,getArchiveFileName(fileToMove,newDir)); + } + + /* + * Moves file from a given path to a new path with String params + */ + public static void moveFile(String fromPath, String dest) throws IOException + { + moveFile(new File(fromPath),new File(dest)); + } + + /* + * Moves file from a given path to a new path with mixed params + */ + public static void moveFile(File fileToMove, String dest) throws IOException + { + moveFile(fileToMove,new File(dest)); + } + + /* + * Moves file from a given path to a new path with File params + */ + public static void moveFile(File fileToMove, File dest) throws IOException + { + fileToMove.renameTo(dest); + } + + /* + * Deletes a given file + */ + public static void deleteFile(String filePath) throws IOException + { + new File(filePath).delete(); + } + + private static String getArchiveFileName(File fileToMove, String archiveDir) + { + //get file name from current path + String fileName = fileToMove.getName(); + return archiveDir + File.separator + fileName; + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java new file mode 100644 index 0000000000..1328816602 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.example.shared; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +/** + * Class that provides helper methods for JNDI + */ +public class InitialContextHelper +{ + + public static final String _defaultPropertiesName = "example.properties"; + protected static Properties _fileProperties; + protected static InitialContext _initialContext; + protected static final Logger _log = LoggerFactory.getLogger(InitialContextHelper.class); + + public InitialContextHelper(String propertiesName) throws ContextException + { + try + { + if ((propertiesName == null) || (propertiesName.length() == 0)) + { + propertiesName = _defaultPropertiesName; + } + + _fileProperties = new Properties(); + ClassLoader cl = this.getClass().getClassLoader(); + + // NB: Need to change path to reflect package if moving classes around ! + InputStream is = cl.getResourceAsStream("org/apache/qpid/example/shared/" + propertiesName); + _fileProperties.load(is); + _initialContext = new InitialContext(_fileProperties); + } + catch (IOException e) + { + throw new ContextException(e.toString(), e); + } + catch (NamingException n) + { + throw new ContextException(n.toString(), n); + } + } + + public Properties getFileProperties() + { + return _fileProperties; + } + + public InitialContext getInitialContext() + { + return _initialContext; + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java new file mode 100644 index 0000000000..c056f8a7da --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.shared; + +/** + * Constants used by AMS Publisher/Subscriber classes + */ +public class Statics { + + public static final String TOPIC_NAME = "EXAMPLE_TOPIC"; + + public static final String QUEUE_NAME = "EXAMPLE_QUEUE"; + + public static final String MONITOR_QUEUE_SUFFIX = "_MONITOR"; + + public static final String HOST_PROPERTY = "host"; + + public static final String PORT_PROPERTY = "port"; + + public static final String USER_PROPERTY = "user"; + + public static final String PWD_PROPERTY = "pwd"; + + public static final String TOPIC_PROPERTY = "topic"; + + public static final String QUEUE_PROPERTY = "queue"; + + public static final String VIRTUAL_PATH_PROPERTY = "virtualpath"; + + public static final String ARCHIVE_PATH = "archivepath"; + + public static final String CLIENT_PROPERTY = "client"; + + public static final String FILENAME_PROPERTY = "filename"; + + public static final String DEFAULT_USER = "guest"; + + public static final String DEFAULT_PWD = "guest"; + + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties new file mode 100644 index 0000000000..a60e3964ad --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' + +# register some queues in JNDI using the form +# queue.[jndiName] = [physicalName] +queue.MyQueue = example.MyQueue + +# register some topics in JNDI using the form +# topic.[jndiName] = [physicalName] +topic.ibmStocks = stocks.nyse.ibm + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.direct = direct://amq.direct//directQueue diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java new file mode 100644 index 0000000000..8a0ff88448 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Client.java @@ -0,0 +1,263 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example.simple.reqresp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class Client implements MessageListener +{ + final String BROKER = "localhost"; + + final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final String CONNECTION_JNDI_NAME = "local"; + final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; + + final String QUEUE_JNDI_NAME = "queue"; + final String QUEUE_NAME = "example.RequestQueue"; + + + private InitialContext _ctx; + + private CountDownLatch _shutdownHook = new CountDownLatch(1); + + public Client() + { + setupJNDI(); + + Connection connection; + Session session; + Destination responseQueue; + + //Setup the connection. Create producer to sent message and consumer to receive the repsonse. + MessageProducer _producer; + try + { + connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection(); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME); + + closeJNDI(); + + //Setup a message _producer to send message to the queue the server is consuming from + _producer = session.createProducer(requestQueue); + + //Create a temporary queue that this client will listen for responses on then create a consumer + //that consumes message from this temporary queue. + responseQueue = session.createTemporaryQueue(); + + MessageConsumer responseConsumer = session.createConsumer(responseQueue); + + //Set a listener to asynchronously deal with responses. + responseConsumer.setMessageListener(this); + + // Now the connection is setup up start it. + connection.start(); + } + catch (JMSException e) + { + System.err.println("Unable to setup connection, client and producer on broker"); + return; + } + + // Setup the message to send + TextMessage txtMessage; + try + { + //Now create the actual message you want to send + txtMessage = session.createTextMessage("Request Process"); + + //Set the reply to field to the temp queue you created above, this is the queue the server will respond to + txtMessage.setJMSReplyTo(responseQueue); + + //Set a correlation ID so when you get a response you know which sent message the response is for + //If there is never more than one outstanding message to the server then the + //same correlation ID can be used for all the messages...if there is more than one outstanding + //message to the server you would presumably want to associate the correlation ID with this message + + txtMessage.setJMSCorrelationID(txtMessage.getJMSMessageID()); + } + catch (JMSException e) + { + System.err.println("Unable to create message"); + return; + + } + + try + { + _producer.send(txtMessage); + } + catch (JMSException e) + { + //Handle the exception appropriately + } + + try + { + System.out.println("Sent Request Message ID :" + txtMessage.getJMSMessageID()); + } + catch (JMSException e) + { + //Handle exception more appropriately. + } + + //Wait for the return message to arrive + try + { + _shutdownHook.await(); + } + catch (InterruptedException e) + { + // Ignore this as we are quitting anyway. + } + + //Close the connection + try + { + connection.close(); + } + catch (JMSException e) + { + System.err.println("A problem occured while shutting down the connection : " + e); + } + } + + + /** + * Implementation of the Message Listener interface. + * This is where message will be asynchronously delivered. + * + * @param message + */ + public void onMessage(Message message) + { + String messageText; + try + { + if (message instanceof TextMessage) + { + TextMessage textMessage = (TextMessage) message; + messageText = textMessage.getText(); + System.out.println("messageText = " + messageText); + System.out.println("Correlation ID " + message.getJMSCorrelationID()); + + _shutdownHook.countDown(); + } + else + { + System.err.println("Unexpected message delivered"); + } + } + catch (JMSException e) + { + //Handle the exception appropriately + } + } + + /** + * Lookup the specified name in the JNDI Context. + * + * @param name The string name of the object to lookup + * + * @return The object or null if nothing exists for specified name + */ + private Object lookupJNDI(String name) + { + try + { + return _ctx.lookup(name); + } + catch (NamingException e) + { + System.err.println("Error looking up '" + name + "' in JNDI Context:" + e); + } + + return null; + } + + /** + * Setup the JNDI context. + * + * In this case we are simply using a Properties object to store the pairing information. + * + * Further details can be found on the wiki site here: + * + * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html + */ + private void setupJNDI() + { + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + + // Create the initial context + Context ctx = null; + try + { + _ctx = new InitialContext(properties); + } + catch (NamingException e) + { + System.err.println("Error Setting up JNDI Context:" + e); + } + } + + /** Close the JNDI Context to keep everything happy. */ + private void closeJNDI() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + System.err.println("Unable to close JNDI Context : " + e); + } + } + + + public static void main(String[] args) + { + new Client(); + } +} + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java new file mode 100644 index 0000000000..9c284eee97 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/simple/reqresp/Server.java @@ -0,0 +1,236 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example.simple.reqresp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.io.BufferedReader; +import java.io.BufferedInputStream; +import java.io.Reader; +import java.io.InputStreamReader; +import java.io.IOException; + +public class Server implements MessageListener +{ + final String BROKER = "localhost"; + + final String INITIAL_CONTEXT_FACTORY = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + final String CONNECTION_JNDI_NAME = "local"; + final String CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + BROKER + "'"; + + final String QUEUE_JNDI_NAME = "queue"; + final String QUEUE_NAME = "example.RequestQueue"; + + + private InitialContext _ctx; + private Session _session; + private MessageProducer _replyProducer; + private CountDownLatch _shutdownHook = new CountDownLatch(1); + + public Server() + { + setupJNDI(); + + Connection connection; + try + { + connection = ((ConnectionFactory) lookupJNDI(CONNECTION_JNDI_NAME)).createConnection(); + + _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination requestQueue = (Queue) lookupJNDI(QUEUE_JNDI_NAME); + + closeJNDI(); + + //Setup a message producer to respond to messages from clients, we will get the destination + //to send to from the JMSReplyTo header field from a Message so we MUST set the destination here to null. + this._replyProducer = _session.createProducer(null); + + //Set up a consumer to consume messages off of the request queue + MessageConsumer consumer = _session.createConsumer(requestQueue); + consumer.setMessageListener(this); + + //Now start the connection + connection.start(); + } + catch (JMSException e) + { + //Handle the exception appropriately + System.err.println("JMSException occured setting up server :" + e); + return; + } + + System.out.println("Server process started and waiting for messages."); + + //Wait to process an single message then quit. + while (_shutdownHook.getCount() != 0) + { + try + { + _shutdownHook.await(); + } + catch (InterruptedException e) + { + // Ignore this as we are quitting anyway. + } + } + + //Close the connection + try + { + connection.close(); + } + catch (JMSException e) + { + System.err.println("A problem occured while shutting down the connection : " + e); + } + } + + public void onMessage(Message message) + { + try + { + TextMessage response = this._session.createTextMessage(); + + //Check we have the right message type. + if (message instanceof TextMessage) + { + TextMessage txtMsg = (TextMessage) message; + String messageText = txtMsg.getText(); + + //Perform the request + System.out.println("Received request:" + messageText + " for message :" + message.getJMSMessageID()); + + //Set the response back to the client + response.setText("Response to Request:" + messageText); + } + + //Set the correlation ID from the received message to be the correlation id of the response message + //this lets the client identify which message this is a response to if it has more than + //one outstanding message to the server + response.setJMSCorrelationID(message.getJMSMessageID()); + + try + { + System.out.println("Received message press enter to send response...."); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + } + catch (IOException e) + { + //Error attemptying to pause + } + + //Send the response to the Destination specified by the JMSReplyTo field of the received message. + _replyProducer.send(message.getJMSReplyTo(), response); + } + catch (JMSException e) + { + //Handle the exception appropriately + } + + _shutdownHook.countDown(); + } + + /** + * Lookup the specified name in the JNDI Context. + * + * @param name The string name of the object to lookup + * + * @return The object or null if nothing exists for specified name + */ + private Object lookupJNDI(String name) + { + try + { + return _ctx.lookup(name); + } + catch (NamingException e) + { + System.err.println("Error looking up '" + name + "' in JNDI Context:" + e); + } + + return null; + } + + /** + * Setup the JNDI context. + * + * In this case we are simply using a Properties object to store the pairing information. + * + * Further details can be found on the wiki site here: + * + * @see : http://cwiki.apache.org/qpid/how-to-use-jndi.html + */ + private void setupJNDI() + { + // Set the properties ... + Properties properties = new Properties(); + properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY); + properties.put("connectionfactory." + CONNECTION_JNDI_NAME, CONNECTION_NAME); + properties.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + + // Create the initial context + Context ctx = null; + try + { + _ctx = new InitialContext(properties); + } + catch (NamingException e) + { + System.err.println("Error Setting up JNDI Context:" + e); + } + } + + /** Close the JNDI Context to keep everything happy. */ + private void closeJNDI() + { + try + { + _ctx.close(); + } + catch (NamingException e) + { + System.err.println("Unable to close JNDI Context : " + e); + } + } + + + public static void main(String[] args) + { + new Server(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java new file mode 100644 index 0000000000..d43b823a13 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.qpid.example.shared.Statics; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import javax.jms.*; +/** + * Subclass of Subscriber which consumes a heartbeat message + */ + +public class MonitoredSubscriber extends Subscriber +{ + protected String _monitorDestinationName; + + private static final Logger _logger = LoggerFactory.getLogger(MonitoredSubscriber.class); + + private static MessageConsumer _monitorConsumer; + + public MonitoredSubscriber() + { + super(); + //lookup queue name and append suffix + _monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX; + } + + /** + * MessageListener implementation for this subscriber + */ + public static class MonitorMessageListener implements MessageListener + { + private String _name; + + public MonitorMessageListener(String name) + { + _name = name; + + } + + /** + * Listens for heartbeat messages and acknowledges them + * @param message + */ + public void onMessage(javax.jms.Message message) + { + _logger.info(_name + " monitor got message '" + message + "'"); + + try + { + _logger.debug("Monitor acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _logger.error("Monitor caught JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _logger.error("Monitor caught unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to Queue and attaches additional monitor listener + */ + public void subscribeAndMonitor() + { + try + { + _connection = _connectionFactory.createConnection(); + + //create a transactional session + Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + //Queue is non-exclusive and not deleted when last consumer detaches + Destination destination = session.createQueue(_monitorDestinationName); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _monitorConsumer = session.createConsumer(destination); + + //give the monitor message listener a name of it's own + _monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener + ("MonitorListener " + System.currentTimeMillis())); + + MonitoredSubscriber._logger.info("Starting monitored subscription ..."); + + MonitoredSubscriber._connection.start(); + + //and now start ordinary consumption too + subscribe(); + } + catch (Throwable t) + { + _logger.error("Fatal error: " + t); + t.printStackTrace(); + } + } + + /** + * Stop consuming + */ + public void stopMonitor() + { + try + { + _monitorConsumer.close(); + _monitorConsumer = null; + stop(); + } + catch(JMSException j) + { + _logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java new file mode 100644 index 0000000000..5e78107182 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + + +/** + * Allows you to simply start a monitored subscriber + */ +public class MonitoredSubscriptionWrapper { + + private static MonitoredSubscriber _subscriber; + + /** + * Create a monitored subscriber and start it + * @param args - no params required + */ + public static void main(String args[]) + { + _subscriber = new MonitoredSubscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java new file mode 100644 index 0000000000..f75558299c --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +import org.apache.qpid.client.AMQConnectionFactory; + +import javax.jms.*; +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.InitialContext; + +import org.apache.qpid.example.shared.InitialContextHelper; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +/** + * Subscriber which consumes messages from a queue + */ + +public class Subscriber +{ + private static final Logger _log = LoggerFactory.getLogger(Subscriber.class); + + protected static Connection _connection; + + protected static MessageConsumer _consumer; + + protected static InitialContextHelper _contextHelper; + + protected static AMQConnectionFactory _connectionFactory; + + protected Destination _destination; + + public Subscriber() + { + try + { + //get an initial context from default properties + _contextHelper = new InitialContextHelper(null); + InitialContext ctx = _contextHelper.getInitialContext(); + + //then create a connection using the AMQConnectionFactory + _connectionFactory = (AMQConnectionFactory) ctx.lookup("local"); + + //lookup queue from context + _destination = (Destination) ctx.lookup("MyQueue"); + + } + catch (Exception e) + { + e.printStackTrace(); + _log.error("Exception", e); + } + } + + /** + * Listener class that handles messages + */ + public static class ExampleMessageListener implements MessageListener + { + private String _name; + + public ExampleMessageListener(String name) + { + _name = name; + } + + /** + * Listens for message callbacks, handles and then acknowledges them + * @param message - the message received + */ + public void onMessage(javax.jms.Message message) + { + _log.info(_name + " got message '" + message + "'"); + + try + { + //NB: Handle your message appropriately for your application here + //do some stuff + + _log.debug("Acknowledging recieved message"); + + //Now acknowledge the message to clear it from our queue + message.acknowledge(); + } + catch(JMSException j) + { + _log.error("JMSException trying to acknowledge message receipt"); + j.printStackTrace(); + } + catch(Exception e) + { + _log.error("Unexpected exception trying to handle message"); + e.printStackTrace(); + } + } + } + + /** + * Subscribes to example Queue and attaches listener + */ + public void subscribe() + { + _log.info("Starting subscription ..."); + + try + { + _connection = _connectionFactory.createConnection(); + + //Non transactional session using client acknowledgement + Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + //Create a consumer with a destination of our queue which will use defaults for prefetch etc + _consumer = session.createConsumer(_destination); + + //give the message listener a name of it's own + _consumer.setMessageListener(new ExampleMessageListener("MessageListener " + System.currentTimeMillis())); + + _connection.start(); + } + catch (Throwable t) + { + _log.error("Fatal error: " + t); + t.printStackTrace(); + } + + _log.info("Waiting for messages ..."); + + //wait for messages and sleep to survive failover + try + { + while(true) + { + Thread.sleep(Long.MAX_VALUE); + } + } + catch (Exception e) + { + _log.warn("Exception while Subscriber sleeping",e); + } + } + + /** + * Stop consuming and close connection + */ + public void stop() + { + try + { + _consumer.close(); + _consumer = null; + _connection.stop(); + _connection.close(); + } + catch(JMSException j) + { + _log.error("JMSException trying to Subscriber.stop: " + j.getStackTrace()); + } + } + +} + + + + diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java new file mode 100644 index 0000000000..f8fbf63037 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.subscriber; + +/** + * Allows you to simply start a subscriber + */ +public class SubscriptionWrapper { + + private static Subscriber _subscriber; + + /** + * Create a subscriber and start it + * @param args + */ + public static void main(String args[]) + { + _subscriber = new Subscriber(); + + _subscriber.subscribe(); + } + + /** + * Stop subscribing now ... + */ + public static void stop() + { + _subscriber.stop(); + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java new file mode 100644 index 0000000000..d7eb138523 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java @@ -0,0 +1,171 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example.transport; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.UUID; + +/** + * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as + * the transport for the Client API. + * + * The Demo here runs twice: + * 1. Just to show a simple publish and receive. + * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism. + */ +public class ExistingSocketConnectorDemo implements ConnectionListener +{ + private static boolean DEMO_FAILOVER = false; + + public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException + { + System.out.println("Testing socket connection to localhost:5672."); + + new ExistingSocketConnectorDemo(); + + System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673."); + + DEMO_FAILOVER = true; + + new ExistingSocketConnectorDemo(); + } + + Connection _connection; + MessageProducer _producer; + Session _session; + + String Socket1_ID = UUID.randomUUID().toString(); + String Socket2_ID = UUID.randomUUID().toString(); + + + + /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ + public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; + + + public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException + { + + Socket socket = SocketChannel.open().socket(); + socket.connect(new InetSocketAddress("localhost", 5672)); + + TransportConnection.registerOpenSocket(Socket1_ID, socket); + + + _connection = new AMQConnection(CONNECTION); + + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue")); + + _producer = _session.createProducer(_session.createQueue("Queue")); + + _connection.start(); + + if (!DEMO_FAILOVER) + { + _producer.send(_session.createTextMessage("Simple Test")); + } + else + { + // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover + ((AMQConnection) _connection).setConnectionListener(this); + + System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672."); + } + + //We do a blocking receive here so that we can demonstrate failover. + Message message = consumer.receive(); + + System.out.println("Recevied :" + message); + + _connection.close(); + } + + // ConnectionListener Interface + + public void bytesSent(long count) + { + //not used in this example + } + public void bytesReceived(long count) + { + //not used in this example + } + + public boolean preFailover(boolean redirect) + { + /** + * This method is called before the underlying client library starts to reconnect. This gives us the opportunity + * to set a new socket for the failover to occur on. + */ + try + { + Socket socket = SocketChannel.open().socket(); + + socket.connect(new InetSocketAddress("localhost", 5673)); + + // This is the new method to pass in an open socket for the connection to use. + TransportConnection.registerOpenSocket(Socket2_ID, socket); + } + catch (IOException e) + { + e.printStackTrace(); + return false; + } + return true; + } + + public boolean preResubscribe() + { + //not used in this example - but must return true to allow the resubscription of existing clients. + return true; + } + + public void failoverComplete() + { + // Now that failover has completed we can send a message that the receiving thread will pick up + try + { + _producer.send(_session.createTextMessage("Simple Failover Test")); + } + catch (JMSException e) + { + e.printStackTrace(); + } + } +} diff --git a/RC7/qpid/java/client/example/src/main/java/runSample.sh b/RC7/qpid/java/client/example/src/main/java/runSample.sh new file mode 100755 index 0000000000..e330fb0c36 --- /dev/null +++ b/RC7/qpid/java/client/example/src/main/java/runSample.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Work out the CLASSPATH divider +UNAME=`uname -s` +case $UNAME in + CYGWIN*) + DIVIDER=";" + ;; + *) + DIVIDER=":" +;; +esac + +if test "'x$QPID_HOME'" != "'x'" +then + QPID_HOME=$QPID_HOME +else + QPID_HOME="/usr/share/java/" +fi +echo "Using QPID_HOME: $QPID_HOME" + +if test "'x$QPID_SAMPLE'" != "'x'" +then + QPID_SAMPLE=$QPID_SAMPLE +else + QPID_SAMPLE="/usr/share/doc/rhm-0.2" +fi +echo "Using QPID_SAMPLE: $QPID_SAMPLE" + + +# set the CLASSPATH +CLASSPATH=`find "$QPID_HOME" -name '*.jar' | tr '\n' "$DIVIDER"` + + +# compile the samples +javac -cp "$CLASSPATH" -sourcepath "$QPID_SAMPLE" -d . `find $QPID_SAMPLE -name '*.java'` + +# Add output classes to CLASSPATH +CLASSPATH="$CLASSPATH$DIVIDER$." + +# Set VM parameters +QPID_PARAM="$QPID_PARAM -Dlog4j.configuration=file://$PWD/log4j.xml" + + +# Check if the user supplied a sample classname +if test "'x$1'" = "'x'" +then + echo "No sample classname specified" + exit; +else + java -cp $CLASSPATH $QPID_PARAM $* +fi |
