From c22e94b05d3959af14eceb71938f6e33aeee0de2 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 8 Mar 2012 20:49:09 +0000 Subject: QPID-3732 Committing a patch by Weston Price. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1298571 13f79535-47bb-0310-9956-ffa450edef68 --- java/jca/example/build-geronimo-properties.xml | 4 +- java/jca/example/build-jboss-properties.xml | 2 +- java/jca/example/build.xml | 5 +- java/jca/example/conf/geronimo-ra.xml | 12 ++- java/jca/example/conf/jboss-web.xml | 5 + java/jca/example/conf/qpid-jca-ds.xml | 15 ++- java/jca/example/conf/web.xml | 11 +++ java/jca/example/qpid-jca-example-properties.xml | 15 ++- .../jca/example/ejb/QpidGoodByeSubscriberBean.java | 1 - .../jca/example/ejb/QpidHelloListenerBean.java | 4 + .../jca/example/ejb/QpidHelloSubscriberBean.java | 1 - .../qpid/jca/example/ejb/QpidJMSResponderBean.java | 13 ++- .../qpid/jca/example/web/QpidTestServlet.java | 43 +++++++- .../apache/qpid/ra/QpidRAManagedConnection.java | 109 ++++++--------------- 14 files changed, 138 insertions(+), 102 deletions(-) (limited to 'java/jca') diff --git a/java/jca/example/build-geronimo-properties.xml b/java/jca/example/build-geronimo-properties.xml index 2251b7a2df..7c5bd33cb0 100644 --- a/java/jca/example/build-geronimo-properties.xml +++ b/java/jca/example/build-geronimo-properties.xml @@ -29,9 +29,11 @@ - + + + diff --git a/java/jca/example/build-jboss-properties.xml b/java/jca/example/build-jboss-properties.xml index 5fc4053cf8..b7edf3d796 100644 --- a/java/jca/example/build-jboss-properties.xml +++ b/java/jca/example/build-jboss-properties.xml @@ -27,7 +27,7 @@ - + diff --git a/java/jca/example/build.xml b/java/jca/example/build.xml index 2717ce84d7..ab70ee45fc 100644 --- a/java/jca/example/build.xml +++ b/java/jca/example/build.xml @@ -80,7 +80,8 @@ - + + @@ -96,7 +97,7 @@ - + diff --git a/java/jca/example/conf/geronimo-ra.xml b/java/jca/example/conf/geronimo-ra.xml index e3e74ebc62..1c7210d2ee 100644 --- a/java/jca/example/conf/geronimo-ra.xml +++ b/java/jca/example/conf/geronimo-ra.xml @@ -114,8 +114,16 @@ org.apache.qpid.ra.admin.QpidQueue org.apache.qpid.ra.admin.QpidQueueImpl - ResponderQueue - @qpid.responder.queue.dest.address@ + RequestQueue + @qpid.request.queue.dest.address@ + + + + org.apache.qpid.ra.admin.QpidQueue + org.apache.qpid.ra.admin.QpidQueueImpl + + ResponseQueue + @qpid.response.queue.dest.address@ diff --git a/java/jca/example/conf/jboss-web.xml b/java/jca/example/conf/jboss-web.xml index edacf8d418..32a97d1c35 100644 --- a/java/jca/example/conf/jboss-web.xml +++ b/java/jca/example/conf/jboss-web.xml @@ -25,6 +25,11 @@ javax.jms.ConnectionFactory java:/QpidJMSXA + + QpidJMS + javax.jms.ConnectionFactory + java:/QpidJMS + QpidTestBean qpid-jcaex/QpidTestBean/local diff --git a/java/jca/example/conf/qpid-jca-ds.xml b/java/jca/example/conf/qpid-jca-ds.xml index 80fb828b55..3c12e25020 100644 --- a/java/jca/example/conf/qpid-jca-ds.xml +++ b/java/jca/example/conf/qpid-jca-ds.xml @@ -72,15 +72,24 @@ - QpidResponderQueue + name="qpid.jca:name=QpidRequestQueue"> + QpidRequestQueue jboss.jca:service=RARDeployment,name='@rar.name@' org.apache.qpid.ra.admin.QpidQueue - destinationAddress=@qpid.responder.queue.dest.address@ + destinationAddress=@qpid.request.queue.dest.address@ + + QpidResponseQueue + jboss.jca:service=RARDeployment,name='@rar.name@' + org.apache.qpid.ra.admin.QpidQueue + + destinationAddress=@qpid.response.queue.dest.address@ + + QpidConnectionFactory diff --git a/java/jca/example/conf/web.xml b/java/jca/example/conf/web.xml index d87c578606..6e53d7beba 100644 --- a/java/jca/example/conf/web.xml +++ b/java/jca/example/conf/web.xml @@ -29,11 +29,22 @@ 1 + + QpidRequestResponseServlet + QpidRequestResponseServlet + org.apache.qpid.jca.example.web.QpidRequestResponseServlet + 1 + + QpidTestServlet /qpid + + QpidRequestResponseServlet + /qpid-reqresp + diff --git a/java/jca/example/qpid-jca-example-properties.xml b/java/jca/example/qpid-jca-example-properties.xml index eb219a05e1..ab0f6267ba 100644 --- a/java/jca/example/qpid-jca-example-properties.xml +++ b/java/jca/example/qpid-jca-example-properties.xml @@ -45,8 +45,10 @@ value="hello.Queue;{create:always, node:{type:queue, x-declare:{auto-delete:true}}}"/> - + + @@ -58,8 +60,10 @@ value="BURL:direct://amq.direct//hello.Queue?routingkey='hello.Queue'"/> - + + @@ -75,5 +79,6 @@ - + + diff --git a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java index 64e0effb1f..2e43898ed7 100644 --- a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java +++ b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidGoodByeSubscriberBean.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.goodbye.topic.jndi.name@"), @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NotDurable"), - @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "hello.Topic"), @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") }) diff --git a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java index 0056e7b0b8..37b5ffbc76 100644 --- a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java +++ b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloListenerBean.java @@ -66,6 +66,10 @@ public class QpidHelloListenerBean implements MessageListener try { + _log.info(message.getJMSDestination().getClass().getName()); + + javax.jms.Queue queue = (javax.jms.Queue)message.getJMSDestination(); + _log.info("QueueName is: " + queue.getQueueName()); if(message instanceof TextMessage) { String content = ((TextMessage)message).getText(); diff --git a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java index 560de36e48..0d87cb6955 100644 --- a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java +++ b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidHelloSubscriberBean.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.hello.topic.jndi.name@"), @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), @ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NotDurable"), - @ActivationConfigProperty(propertyName = "subscriptionName", propertyValue = "hello.Topic"), @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") }) public class QpidHelloSubscriberBean implements MessageListener diff --git a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java index e7b44e10ca..d4562511d0 100644 --- a/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java +++ b/java/jca/example/src/main/java/org/apache/qpid/jca/example/ejb/QpidJMSResponderBean.java @@ -27,6 +27,7 @@ import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; @@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory; @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), - @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.responder.queue.jndi.name@"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "@qpid.request.queue.jndi.name@"), @ActivationConfigProperty(propertyName = "connectionURL", propertyValue = "@broker.url@"), @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") }) @@ -72,10 +73,12 @@ public class QpidJMSResponderBean implements MessageListener temp.append("QpidJMSResponderBean received message with content: [" + content); temp.append("] at " + new Date()); + connection = _connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + if(message.getJMSReplyTo() != null) { - connection = _connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _log.info("Sending response via JMSReplyTo"); messageProducer = session.createProducer(message.getJMSReplyTo()); response = session.createTextMessage(); response.setText(temp.toString()); @@ -83,8 +86,10 @@ public class QpidJMSResponderBean implements MessageListener } else { - _log.warn("Response was requested with no JMSReplyToDestination set. Will not respond to message."); + _log.info("JMSReplyTo is null. Will not respond to message."); } + + } } catch(Exception e) diff --git a/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java b/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java index 11a61e762c..07c3e38f60 100644 --- a/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java +++ b/java/jca/example/src/main/java/org/apache/qpid/jca/example/web/QpidTestServlet.java @@ -52,6 +52,7 @@ public class QpidTestServlet extends HttpServlet private static final int DEFAULT_COUNT = 1; private static final boolean DEFAULT_TOPIC = false; private static final boolean DEFAULT_XA = false; + private static final boolean DEFAULT_TX = false; private static final boolean DEFAULT_SAY_GOODBYE = true; @Resource(@jndi.scheme@="@qpid.xacf.jndi.name@") @@ -82,6 +83,7 @@ public class QpidTestServlet extends HttpServlet UserTransaction ut = null; boolean useXA = false; boolean rollback = false; + boolean useTX = false; try { @@ -90,8 +92,10 @@ public class QpidTestServlet extends HttpServlet int count = (req.getParameter("count") == null) ? DEFAULT_COUNT : Integer.valueOf(req.getParameter("count")); boolean useTopic = (req.getParameter("useTopic") == null) ? DEFAULT_TOPIC : Boolean.valueOf(req.getParameter("useTopic")); useXA = (req.getParameter("useXA") == null) ? DEFAULT_XA : Boolean.valueOf(req.getParameter("useXA")); + useTX = (req.getParameter("useTX") == null) ? DEFAULT_TX : Boolean.valueOf(req.getParameter("useTX")); ctx = new InitialContext(); boolean sayGoodBye = (req.getParameter("sayGoodBye") == null) ? DEFAULT_SAY_GOODBYE : Boolean.valueOf(req.getParameter("sayGoodBye")); + useTX = (req.getParameter("useTX") == null) ? DEFAULT_TOPIC : Boolean.valueOf(req.getParameter("DEFAULT_TX")); _log.debug("Environment: "); _log.debug("Message content: " + content); @@ -122,7 +126,7 @@ public class QpidTestServlet extends HttpServlet } connection = _connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(useTX, Session.AUTO_ACKNOWLEDGE); messageProducer = (useTopic) ? session.createProducer(_topic) : session.createProducer(_queue); for(int i = 0; i < count; i++) @@ -155,6 +159,19 @@ public class QpidTestServlet extends HttpServlet } } + if(useTX) + { + try + { + session.rollback(); + } + catch(Exception ex) + { + _log.error(ex.getMessage(), ex); + throw new ServletException(ex.getMessage(), ex); + } + } + _log.error(e.getMessage(), e); throw new ServletException(e.getMessage(), e); } @@ -181,12 +198,32 @@ public class QpidTestServlet extends HttpServlet } } + if(useTX && !useXA) + { + try + { + + if(rollback) + { + session.rollback(); + } + else + { + session.commit(); + } + } + catch(Exception e) + { + + _log.error(e.getMessage(), e); + throw new ServletException(e.getMessage(), e); + } + } + QpidUtil.closeResources(session, connection, ctx); } } - - } diff --git a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java index 53896d8872..eccf77aff2 100644 --- a/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java +++ b/java/jca/src/main/java/org/apache/qpid/ra/QpidRAManagedConnection.java @@ -34,10 +34,11 @@ import java.util.concurrent.locks.ReentrantLock; import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; +import javax.jms.QueueConnection; import javax.jms.ResourceAllocationException; import javax.jms.Session; -import javax.jms.QueueConnection; import javax.jms.TopicConnection; +import javax.jms.XAConnection; import javax.jms.XAQueueConnection; import javax.jms.XASession; import javax.jms.XATopicConnection; @@ -260,7 +261,20 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList } catch (JMSException e) { - _log.debug("Error closing session " + this, e); + _log.debug("Error closing XASession " + this, e); + } + + try + { + if(_session != null) + { + _session.close(); + } + + } + catch(JMSException e) + { + _log.error("Error closing Session " + this, e); } if (_connection != null) @@ -585,7 +599,7 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList */ protected Session getSession() throws JMSException { - if(_xaSession != null && !_mcf.getUseLocalTx()) + if(_xaSession != null && !_mcf.getUseLocalTx() && _inManagedTx) { if (_log.isTraceEnabled()) { @@ -761,107 +775,44 @@ public class QpidRAManagedConnection implements ManagedConnection, ExceptionList { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createTopicConnection(_userName, _password); - } + _connection = _mcf.getCleanAMQConnectionFactory().createXATopicConnection(_userName, _password); } else { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createTopicConnection(); - } + _connection = _mcf.getDefaultAMQConnectionFactory().createXATopicConnection(); } - if(!transacted) - { - _xaSession = ((XATopicConnection)_connection).createXATopicSession(); - } - else - { - _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode); - } + _xaSession = ((XATopicConnection)_connection).createXATopicSession(); + _session = ((TopicConnection)_connection).createTopicSession(transacted, acknowledgeMode); + } else if (_cri.getType() == QpidRAConnectionFactory.QUEUE_CONNECTION) { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createQueueConnection(_userName, _password); - } + _connection = _mcf.getCleanAMQConnectionFactory().createXAQueueConnection(_userName, _password); } else { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createQueueConnection(); - } + _connection = _mcf.getDefaultAMQConnectionFactory().createXAQueueConnection(); } - if(!transacted) - { - _xaSession = ((XAQueueConnection)_connection).createXAQueueSession(); + _xaSession = ((XAQueueConnection)_connection).createXAQueueSession(); + _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode); - } - else - { - _session = ((QueueConnection)_connection).createQueueSession(transacted, acknowledgeMode); - - } } else { if (_userName != null && _password != null) { - if(!transacted) - { - _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password); - } - else - { - _connection = _mcf.getCleanAMQConnectionFactory().createConnection(_userName, _password); - } - } - else - { - if(!transacted) - { - _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection(); - } - else - { - _connection = _mcf.getDefaultAMQConnectionFactory().createConnection(); - } - } - - if(!transacted) - { - _xaSession = ((XAQueueConnection)_connection).createXASession(); - + _connection = _mcf.getCleanAMQConnectionFactory().createXAConnection(_userName, _password); } else { - _session = ((QueueConnection)_connection).createSession(transacted, acknowledgeMode); - + _connection = _mcf.getDefaultAMQConnectionFactory().createXAConnection(); } + _xaSession = ((XAConnection)_connection).createXASession(); + _session = _connection.createSession(transacted, acknowledgeMode); } _connection.setExceptionListener(this); -- cgit v1.2.1