diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2011-12-18 05:09:07 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2011-12-18 05:09:07 +0000 |
| commit | 8581b766bdd0fe06b128ea0f4fdf814435e618cb (patch) | |
| tree | 0ba5887a27cd45b207be2b7eaa998a193a1b035f /qpid/java/jca/example/src | |
| parent | 345dac43c4453608f3b53728dcd310ff4767a544 (diff) | |
| download | qpid-python-8581b766bdd0fe06b128ea0f4fdf814435e618cb.tar.gz | |
QPID-3044: Implement JCA Adapter for Java JMS client
- Large contributions from Weston Price & Kevin Conner
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1220336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/jca/example/src')
14 files changed, 1287 insertions, 0 deletions
diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidRequestResponseClient.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidRequestResponseClient.java new file mode 100644 index 0000000000..734df1c0f3 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidRequestResponseClient.java @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.apache.qpid.jca.example.ejb.QpidUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidRequestResponseClient implements MessageListener, Runnable +{ + private static final Logger _log = LoggerFactory.getLogger(QpidRequestResponseClient.class); + + private static final String DEFAULT_CF_JNDI = "QpidConnectionFactory"; + private static final String DEFAULT_DESTINATION_JNDI = "QpidResponderQueue"; + private static final String DEFAULT_MESSAGE = "Hello, World!"; + private static final int DEFAULT_MESSAGE_COUNT = 1; + private static final int DEFAULT_THREAD_COUNT = 1; + private static CountDownLatch THREAD_LATCH; + private static InitialContext CONTEXT; + + private ConnectionFactory _connectionFactory; + private Connection _connection; + private Session _session; + private CountDownLatch _latch = null; + private int _count = DEFAULT_MESSAGE_COUNT; + + /** + * @param args + */ + public static void main(String[] args) throws Exception + { + int threadCount = (System.getProperty("thread.count") == null) + ? DEFAULT_THREAD_COUNT : Integer.valueOf(System.getProperty("thread.count")); + + _log.debug("Creating " + threadCount + " threads for execution."); + + THREAD_LATCH = new CountDownLatch(threadCount); + + CONTEXT = new InitialContext(); + + for(int i = 0; i < threadCount; i++) + { + new Thread(new QpidRequestResponseClient()).start(); + } + + _log.debug("Waiting for " + threadCount + " to finish."); + THREAD_LATCH.await(10, TimeUnit.SECONDS); + + QpidUtil.closeResources(CONTEXT); + } + + @Override + public void onMessage(Message message) + { + _latch.countDown(); + + if(message instanceof TextMessage) + { + try + { + _log.debug("Thread " + Thread.currentThread().getId() + " received response message with content " + ((TextMessage)message).getText()); + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + } + + if(_latch.getCount() == _count) + { + QpidUtil.closeResources(_session, _connection); + } + + THREAD_LATCH.countDown(); + + } + + public void run() + { + MessageProducer producer = null; + Destination requestQueue = null; + Destination responseQueue = null; + + String cfName = (System.getProperty("qpid.cf.name") == null) ? DEFAULT_CF_JNDI : System.getProperty("qpid.cf.name"); + String destName = (System.getProperty("qpid.dest.name") == null) ? DEFAULT_DESTINATION_JNDI : System.getProperty("qpid.dest.name"); + + try + { + _count = (System.getProperty("message.count") == null) ? DEFAULT_MESSAGE_COUNT : Integer.valueOf(System.getProperty("message.count")); + _latch = new CountDownLatch(_count); + + _connectionFactory = (ConnectionFactory)QpidTestUtil.getFromJNDI(CONTEXT, cfName); + requestQueue = (Destination)QpidTestUtil.getFromJNDI(CONTEXT, destName); + _connection = _connectionFactory.createConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = _session.createProducer(requestQueue); + responseQueue = _session.createTemporaryQueue(); + _session.createConsumer(responseQueue).setMessageListener(this); + + + _connection.start(); + + String content = (System.getProperty("qpid.message") == null) ? DEFAULT_MESSAGE : System.getProperty("qpid.message"); + + for(int i = 0; i < _count; i++) + { + TextMessage message = _session.createTextMessage(); + message.setText(content); + message.setJMSReplyTo(responseQueue); + producer.send(message); + + } + + _latch.await(); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(producer); + } + + } + +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestClient.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestClient.java new file mode 100644 index 0000000000..a5a33e36ec --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestClient.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; + +import org.apache.qpid.jca.example.ejb.QpidTest; +import org.apache.qpid.jca.example.ejb.QpidUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidTestClient +{ + private static final Logger _log = LoggerFactory.getLogger(QpidTestClient.class); + + private static final String DEFAULT_EJB_JNDI = "QpidTestBean/remote"; + private static final String DEFAULT_CF_JNDI = "QpidConnectionFactory"; + private static final String DEFAULT_MESSAGE = "Hello,World!"; + private static final int DEFAULT_MESSAGE_COUNT = 1; + private static final boolean DEFAULT_USE_TOPIC = false; + private static final boolean DEFAULT_USE_EJB = true; + private static final String DEFAULT_DESTINATION_JNDI = "HelloQueue"; + private static final boolean DEFAULT_SAY_GOODBYE = false; + + public static void main(String[] args) throws Exception + { + String content = (System.getProperty("qpid.message") == null) ? DEFAULT_MESSAGE : System.getProperty("qpid.message"); + boolean useEJB = (System.getProperty("use.ejb") == null) ? DEFAULT_USE_EJB : Boolean.valueOf(System.getProperty("use.ejb")); + int total = (System.getProperty("message.count") == null) ? DEFAULT_MESSAGE_COUNT : Integer.valueOf(System.getProperty("message.count")); + boolean useTopic = (System.getProperty("use.topic") == null) ? DEFAULT_USE_TOPIC : Boolean.valueOf(System.getProperty("use.topic")); + String destType = (useTopic) ? "Topic" : "Queue"; + boolean goodbye = (System.getProperty("say.goodbye") == null) ? DEFAULT_SAY_GOODBYE : Boolean.valueOf(System.getProperty("say.goodbye")); + + _log.debug("Environment: "); + _log.debug("JNDI IntialContectFactory: " + System.getProperty("java.naming.factory.initial")); + _log.debug("JNDI Provider: " + System.getProperty("java.naming.provider.url")); + _log.debug("Message content: " + content); + _log.debug("Message count:" + total); + _log.debug("Protocol: " + ((useEJB) ? "EJB" : "JMS")); + _log.debug("Destination Type: " + destType); + _log.debug("Say GoodBye : " + goodbye); + + Context context = new InitialContext(); + + if(useEJB) + { + + String ejbName = (System.getProperty("qpid.ejb.name") == null) ? DEFAULT_EJB_JNDI : System.getProperty("qpid.ejb.name"); + + QpidTest ejb = (QpidTest)QpidTestUtil.getFromJNDI(context, ejbName); + + _log.debug("Found SLSB " + ejbName + "in JNDI"); + ejb.testQpidAdapter(content, total, useTopic, false, goodbye); + + } + else + { + ConnectionFactory connectionFactory = null; + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + Destination destination = null; + int count = 0; + + String cfName = (System.getProperty("qpid.cf.name") == null) ? DEFAULT_CF_JNDI : System.getProperty("qpid.cf.name"); + String destName = (System.getProperty("qpid.dest.name") == null) ? DEFAULT_DESTINATION_JNDI : System.getProperty("qpid.dest.name"); + + _log.debug("Using JMS with CF name " + cfName + " and Destination name " + destName + " to send " + total + " message(s) with content " + content); + + try + { + _log.debug("Using JNDI at " + System.getProperty("java.naming.provider.url")); + + connectionFactory = (ConnectionFactory)QpidTestUtil.getFromJNDI(context, cfName); + destination = (Destination)QpidTestUtil.getFromJNDI(context, destName); + + _log.debug("Using CF: " + connectionFactory); + _log.debug("Destination " + destination); + + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = session.createProducer(destination); + + _log.debug("Sending " + total + " message(s) with content: " + content + " to destination " + destName); + + for(int i = 0; i < total; i++) + { + TextMessage message = session.createTextMessage(content); + message.setBooleanProperty("say.goodbye", goodbye); + messageProducer.send(message); + count++; + } + + + } + catch(Exception e) + { + e.printStackTrace(); + _log.error(e.getMessage()); + } + finally + { + QpidUtil.closeResources(session, connection, context); + } + + _log.debug(count + " message(s) sent successfully"); + } + + } +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestUtil.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestUtil.java new file mode 100644 index 0000000000..7a53335d79 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/client/QpidTestUtil.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.client; + +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.spi.NamingManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidTestUtil { + private static final Logger _log = LoggerFactory.getLogger(QpidTestUtil.class); + + /* + * Encapsulate looking up in JNDI and working around a seeming bug in OpenEJB which returns a + * Reference when it should just return an object constructed from it + */ + static Object getFromJNDI(Context context, String name) throws NamingException, Exception + { + Object o = context.lookup(name); + if (o instanceof Reference) + { + _log.debug("Got a Reference back from JNDI for " + name + " - working around"); + return NamingManager.getObjectInstance(o, null, null, null); + } + else + { + return o; + } + } + +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeListenerBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeListenerBean.java new file mode 100644 index 0000000000..9cf220de2a --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeListenerBean.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.ejb; + +import java.util.Date; + +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.goodbye.queue.jndi.name@"), + @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), + @ActivationConfigProperty(propertyName = "useLocalTx", propertyValue = "false"), + @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") +}) +public class QpidGoodByeListenerBean implements MessageListener +{ + private static final Logger _log = LoggerFactory.getLogger(QpidGoodByeListenerBean.class); + + @Override + public void onMessage(Message message) + { + try + { + if(message instanceof TextMessage) + { + String content = ((TextMessage)message).getText(); + + _log.info("Received text message with contents: [" + content + "] at " + new Date()); + } + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + + } + +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java new file mode 100644 index 0000000000..8ad8aaa482 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java @@ -0,0 +1,27 @@ +package org.apache.qpid.jca.example.ejb; + +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Message; +import javax.jms.MessageListener; + +@MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.goodbye.topic.jndi.name@"), + @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), + @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NotDurable"), + @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "hello.Topic"), + @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") +}) + +public class QpidGoodByeSubscriberBean implements MessageListener +{ + + @Override + public void onMessage(Message message) + { + System.out.println(message); + } + +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java new file mode 100644 index 0000000000..d6d08d1557 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.ejb; + +import java.util.Date; + +import javax.annotation.PostConstruct; +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.hello.queue.jndi.name@"), + @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), + @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") +}) +public class QpidHelloListenerBean implements MessageListener +{ + private static final Logger _log = LoggerFactory.getLogger(QpidHelloListenerBean.class); + + private ConnectionFactory _connectionFactory; + + private Destination _queue; + + @PostConstruct + public void init() + { + InitialContext context = null; + + try + { + context = new InitialContext(); + _connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/QpidJMSXA"); + _queue = (Destination)context.lookup("@qpid.goodbye.queue.jndi.name@"); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(context); + } + + } + + @Override + public void onMessage(Message message) + { + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + TextMessage response = null; + + try + { + if(message instanceof TextMessage) + { + String content = ((TextMessage)message).getText(); + + _log.info("Received text message with contents: [" + content + "] at " + new Date()); + + StringBuffer temp = new StringBuffer(); + temp.append("QpidHelloListenerBean received message with content: [" + content); + temp.append("] at " + new Date()); + + if(message.propertyExists("say.goodbye") && message.getBooleanProperty("say.goodbye")) + { + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = session.createProducer(_queue); + response = session.createTextMessage(temp.toString()); + messageProducer.send(response); + } + } + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(session, connection); + } + } +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java new file mode 100644 index 0000000000..43ccf9defd --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.ejb; + +import java.util.Date; + +import javax.annotation.PostConstruct; +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.hello.topic.jndi.name@"), + @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), + @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NotDurable"), + @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "hello.Topic"), + @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") +}) +public class QpidHelloSubscriberBean implements MessageListener +{ + private static final Logger _log = LoggerFactory.getLogger(QpidHelloSubscriberBean.class); + + private ConnectionFactory _connectionFactory; + + private Destination _topic; + + @PostConstruct + public void init() + { + InitialContext context = null; + + try + { + context = new InitialContext(); + _connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/QpidJMSXA"); + _topic = (Destination)context.lookup("@qpid.goodbye.topic.jndi.name@"); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(context); + } + + } + + @Override + public void onMessage(Message message) + { + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + TextMessage response = null; + + try + { + if(message instanceof TextMessage) + { + String content = ((TextMessage)message).getText(); + + _log.info("Received text message with contents: [" + content + "] at " + new Date()); + + StringBuffer temp = new StringBuffer(); + temp.append("QpidHelloSubscriberBean received message with content: [" + content); + temp.append("] at " + new Date()); + + if(message.propertyExists("say.goodbye") && message.getBooleanProperty("say.goodbye")) + { + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = session.createProducer(_topic); + response = session.createTextMessage(temp.toString()); + messageProducer.send(response); + } + } + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(session, connection); + } + } +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java new file mode 100644 index 0000000000..74d6fb6d89 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java @@ -0,0 +1,100 @@ +package org.apache.qpid.jca.example.ejb; + +import java.util.Date; + +import javax.annotation.PostConstruct; +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@MessageDriven(activationConfig = { + @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.responder.queue.jndi.name@"), + @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), + @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") +}) +public class QpidJMSResponderBean implements MessageListener +{ + + private static final Logger _log = LoggerFactory.getLogger(QpidJMSResponderBean.class); + + private ConnectionFactory _connectionFactory; + + @PostConstruct + public void init() + { + InitialContext context = null; + + try + { + context = new InitialContext(); + _connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/QpidJMSXA"); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(context); + } + + } + + @Override + public void onMessage(Message message) + { + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + TextMessage response = null; + + try + { + if(message instanceof TextMessage) + { + String content = ((TextMessage)message).getText(); + + _log.info("Received text message with contents: [" + content + "] at " + new Date()); + + StringBuffer temp = new StringBuffer(); + temp.append("QpidJMSResponderBean received message with content: [" + content); + temp.append("] at " + new Date()); + + if(message.getJMSReplyTo() != null) + { + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = session.createProducer(message.getJMSReplyTo()); + response = session.createTextMessage(); + response.setText(temp.toString()); + messageProducer.send(response); + } + else + { + _log.warn("Response was requested with no JMSReplyToDestination set. Will not respond to message."); + } + } + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(session, connection); + } + } +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTest.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTest.java new file mode 100644 index 0000000000..14488fda53 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTest.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.jca.example.ejb; + +public interface QpidTest +{ + public void testQpidAdapter(String content) throws Exception; + public void testQpidAdapter(String content, int count) throws Exception; + public void testQpidAdapter(String content, int count, boolean useTopic) throws Exception; + public void testQpidAdapter(String content, int count, boolean useTopic, boolean respond, boolean sayGoodbye) throws Exception; +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestBean.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestBean.java new file mode 100644 index 0000000000..17e37b9475 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestBean.java @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.ejb; + +import javax.annotation.PostConstruct; +import javax.ejb.Stateless; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Stateless +public class QpidTestBean implements QpidTestRemote, QpidTestLocal +{ + + private static final Logger _log = LoggerFactory.getLogger(QpidTestBean.class); + + private ConnectionFactory _connectionFactory; + + private Destination _queue; + + private Destination _topic; + + @PostConstruct + public void init() + { + InitialContext context = null; + + try + { + context = new InitialContext(); + _connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/QpidJMSXA"); + _queue = (Destination)context.lookup("@qpid.hello.queue.jndi.name@"); + _topic = (Destination)context.lookup("@qpid.hello.topic.jndi.name@"); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(context); + } + + } + @Override + public void testQpidAdapter(String content) throws Exception + { + testQpidAdapter(content, 1); + } + + @Override + public void testQpidAdapter(String content, int count) throws Exception + { + testQpidAdapter(content, count, false); + } + + public void testQpidAdapter(final String content, int count, boolean useTopic) throws Exception + { + testQpidAdapter(content, count, useTopic, false, false); + } + + @Override + public void testQpidAdapter(String content, int count, boolean useTopic, + boolean respond, boolean sayGoodbye) throws Exception + { + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + + _log.info("Sending " + count + " message(s) to MDB with content " + content); + + try + { + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = (useTopic) ? session.createProducer(_topic) : session.createProducer(_queue); + + for(int i = 0; i < count; i++) + { + TextMessage message = session.createTextMessage(content); + message.setBooleanProperty("say.goodbye", sayGoodbye); + messageProducer.send(message); + } + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + throw e; + } + finally + { + QpidUtil.closeResources(messageProducer, session, connection); + } + } + +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestLocal.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestLocal.java new file mode 100644 index 0000000000..73a0de08c2 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestLocal.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.jca.example.ejb; + +import javax.ejb.Local; + +@Local +public interface QpidTestLocal extends QpidTest +{ +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestRemote.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestRemote.java new file mode 100644 index 0000000000..2abb4d71f5 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidTestRemote.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.jca.example.ejb; + +import javax.ejb.Remote; + +@Remote +public interface QpidTestRemote extends QpidTest +{ +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidUtil.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidUtil.java new file mode 100644 index 0000000000..d96a4e8163 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidUtil.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jca.example.ejb; + +import java.lang.reflect.Method; +import java.util.Enumeration; + +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QpidUtil +{ + private static final Logger _log = LoggerFactory.getLogger(QpidTestBean.class); + + public static void handleMessage(String beanName, final Message message) throws Exception + { + if(message instanceof TextMessage) + { + String content = ((TextMessage)message).getText(); + _log.debug(beanName + ": Received text message with contents " + content); + + if(content.contains("PrintEnv")) + { + printJMSHeaders(message); + printProperties(message); + } + } + } + + @SuppressWarnings("rawtypes") + public static void printProperties(final Message message) throws Exception + { + _log.debug("Priting Message Properties:"); + + Enumeration e = message.getPropertyNames(); + + while(e.hasMoreElements()) + { + _log.debug(e + ":" + message.getObjectProperty(e.toString())); + } + } + + public static void printJMSHeaders(final Message message) throws Exception + { + _log.debug("JMSCorrelationID:" + message.getJMSCorrelationID()); + _log.debug("JMSDeliveryMode:" + message.getJMSDeliveryMode()); + _log.debug("JMSExpires:" + message.getJMSExpiration()); + _log.debug("JMSMessageID:" + message.getJMSMessageID()); + _log.debug("JMSPriority:" + message.getJMSPriority()); + _log.debug("JMSTimestamp:" + message.getJMSTimestamp()); + _log.debug("JMSType:" + message.getJMSType()); + _log.debug("JMSReplyTo:" + message.getJMSReplyTo()); + } + + public static void closeResources(Object...objects) + { + try + { + for(Object object: objects) + { + Method close = object.getClass().getMethod("close", new Class[]{}); + close.invoke(object, new Object[]{}); + } + } + catch(Exception ignore) + { + } + } +} diff --git a/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java new file mode 100644 index 0000000000..71289b22c3 --- /dev/null +++ b/qpid/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java @@ -0,0 +1,209 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.jca.example.web; +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.naming.InitialContext; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.transaction.UserTransaction; + +import org.apache.qpid.jca.example.ejb.QpidTest; +import org.apache.qpid.jca.example.ejb.QpidUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("serial") +public class QpidTestServlet extends HttpServlet +{ + private static final Logger _log = LoggerFactory.getLogger(QpidTestServlet.class); + + private static final String DEFAULT_MESSAGE = "Hello, World!"; + private static final int DEFAULT_COUNT = 1; + private static final boolean DEFAULT_TOPIC = false; + private static final boolean DEFAULT_XA = false; + private static final boolean DEFAULT_SAY_GOODBYE = true; + + private ConnectionFactory _connectionFactory; + + private Destination _queue; + + private Destination _topic; + + public void init(ServletConfig config) throws ServletException + { + + InitialContext context = null; + + try + { + context = new InitialContext(); + _connectionFactory = (ConnectionFactory)context.lookup("java:comp/env/QpidJMSXA"); + _queue = (Destination)context.lookup("@qpid.hello.queue.jndi.name@"); + _topic = (Destination)context.lookup("@qpid.hello.topic.jndi.name@"); + + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + throw new ServletException(e.getMessage(), e); + } + finally + { + QpidUtil.closeResources(context); + } + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + doPost(req, resp); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + InitialContext ctx = null; + Connection connection = null; + Session session = null; + MessageProducer messageProducer = null; + UserTransaction ut = null; + boolean useXA = false; + boolean rollback = false; + + try + { + String content = (req.getParameter("message") == null) ? DEFAULT_MESSAGE : req.getParameter("message"); + boolean useEJB = (req.getParameter("useEJB") == null) ? false : Boolean.valueOf(req.getParameter("useEJB")); + int count = (req.getParameter("count") == null) ? DEFAULT_COUNT : Integer.valueOf(req.getParameter("count")); + boolean useTopic = (req.getParameter("useTopic") == null) ? DEFAULT_TOPIC : Boolean.valueOf(req.getParameter("useTopic")); + useXA = (req.getParameter("useXA") == null) ? DEFAULT_XA : Boolean.valueOf(req.getParameter("useXA")); + ctx = new InitialContext(); + boolean sayGoodBye = (req.getParameter("sayGoodBye") == null) ? DEFAULT_SAY_GOODBYE : Boolean.valueOf(req.getParameter("sayGoodBye")); + + _log.debug("Environment: "); + _log.debug("Message content: " + content); + _log.debug("Message count:" + count); + _log.debug("Protocol: " + ((useEJB) ? "EJB" : "JMS")); + _log.debug("Destination Type: " + ((useTopic) ? "Topic" : "Queue")); + _log.debug("Using XA: " + useXA); + _log.debug("Say GoodBye: ", sayGoodBye); + + resp.getOutputStream().println("Environment: "); + resp.getOutputStream().println("Message content: " + content); + resp.getOutputStream().println("Message count:" + count); + resp.getOutputStream().println("Protocol: " + ((useEJB) ? "EJB" : "JMS")); + resp.getOutputStream().println("Destination Type: " + ((useTopic) ? "Topic" : "Queue")); + resp.getOutputStream().println("Using XA: " + useXA); + resp.getOutputStream().println("Say GoodBye: " + sayGoodBye); + + if(useEJB) + { + QpidTest ejb = (QpidTest)ctx.lookup("java:comp/env/QpidTestBean"); + ejb.testQpidAdapter(content, count, useTopic, false, sayGoodBye); + } + else + { + if(useXA) + { + ut = (UserTransaction)ctx.lookup("java:comp/UserTransaction"); + ut.begin(); + } + + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + messageProducer = (useTopic) ? session.createProducer(_topic) : session.createProducer(_queue); + + for(int i = 0; i < count; i++) + { + TextMessage message = session.createTextMessage(content); + message.setBooleanProperty("say.goodbye", sayGoodBye); + messageProducer.send(message); + } + + } + + resp.getOutputStream().println("Sent " + count + " messages with content '" + content + "'"); + resp.getOutputStream().flush(); + + } + catch(Exception e) + { + + if(useXA && ut != null) + { + try + { + rollback = true; + ut.setRollbackOnly(); + } + catch(Exception ex) + { + _log.error(ex.getMessage(), ex); + throw new ServletException(ex.getMessage(), ex); + } + } + + _log.error(e.getMessage(), e); + throw new ServletException(e.getMessage(), e); + } + finally + { + if(useXA && ut != null) + { + try + { + if(rollback) + { + ut.rollback(); + } + else + { + ut.commit(); + } + } + catch(Exception e) + { + _log.error(e.getMessage(), e); + throw new ServletException(e.getMessage(), e); + + } + } + + QpidUtil.closeResources(session, connection, ctx); + } + } + + + +} + + |
