diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-06-18 17:23:50 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-06-18 17:23:50 +0000 |
| commit | d2fbb54eaa3e3a0e733740dac093282e1d60c48f (patch) | |
| tree | 096e0b39c1b72fbac31c4be9961737d64dfe210a /qpid/java/client | |
| parent | 2407c96b7e388c7455e47112c3d7ef133ce62649 (diff) | |
| download | qpid-python-d2fbb54eaa3e3a0e733740dac093282e1d60c48f.tar.gz | |
QPID-4934 : [Java XA] Stop redundant session creation for XA Sessions, improve logging for XA
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1494214 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
3 files changed, 46 insertions, 30 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1baaff738b..6b87316e87 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -24,7 +24,6 @@ import static org.apache.qpid.transport.Option.UNRELIABLE; import java.lang.ref.WeakReference; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -95,6 +94,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); private static Timer timer = new Timer("ack-flusher", true); + private final String _name; private static class Flusher extends TimerTask { @@ -153,6 +153,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic private boolean _isHardError = Boolean.getBoolean("qpid.session.legacy_exception_behaviour"); //--- constructors + /** * Creates a new session on a connection. * @@ -173,28 +174,38 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark); _qpidConnection = qpidConnection; - if (name == null) + _name = name; + _qpidSession = createSession(); + + if (maxAckDelay > 0) + { + flushTask = new Flusher(this); + timer.schedule(flushTask, new Date(), maxAckDelay); + } + } + + protected Session createSession() + { + Session qpidSession; + if (_name == null) { - _qpidSession = _qpidConnection.createSession(1); + qpidSession = _qpidConnection.createSession(1); } else { - _qpidSession = _qpidConnection.createSession(name,1); + qpidSession = _qpidConnection.createSession(_name,1); } - _qpidSession.setSessionListener(this); if (isTransacted()) { - _qpidSession.txSelect(); - _qpidSession.setTransacted(true); + qpidSession.txSelect(); + qpidSession.setTransacted(true); } + qpidSession.setSessionListener(this); - if (maxAckDelay > 0) - { - flushTask = new Flusher(this); - timer.schedule(flushTask, new Date(), maxAckDelay); - } + return qpidSession; } + /** * Creates a new session on a connection with the default 0-10 message factory. * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 6341510c2f..6c745feea8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -88,7 +88,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("commit tx branch with xid: ", xid); + _logger.debug("commit tx branch with xid: {} ", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE); @@ -132,7 +132,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("end tx branch with xid: ", xid); + _logger.debug("end tx branch with xid: {}", xid); } switch (flag) { @@ -191,7 +191,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("forget tx branch with xid: ", xid); + _logger.debug("forget tx branch with xid: {}", xid); } _xaSession.getQpidSession().dtxForget(convertXid(xid)); try @@ -281,7 +281,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("prepare ", xid); + _logger.debug("prepare {}", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid)); XaResult result = null; @@ -361,7 +361,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("rollback tx branch with xid: ", xid); + _logger.debug("rollback tx branch with xid: {}", xid); } Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid)); @@ -428,7 +428,7 @@ public class XAResourceImpl implements AMQXAResource { if (_logger.isDebugEnabled()) { - _logger.debug("start tx branch with xid: ", xid); + _logger.debug("start tx branch with xid: {}", xid); } switch (flag) { @@ -524,7 +524,7 @@ public class XAResourceImpl implements AMQXAResource // this should not happen if (_logger.isDebugEnabled()) { - _logger.debug("got unexpected status value: ", status); + _logger.debug("got unexpected status value: {}", status); } //A resource manager error has occured in the transaction branch. throw new XAException(XAException.XAER_RMERR); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index e01ec8578d..fa0bdcb4c9 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -75,8 +75,15 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow, String name) { - super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name); - createSession(); + super(qpidConnection, + con, + channelId, + transacted, + ackMode, + registry, + defaultPrefetchHigh, + defaultPrefetchLow, + name); _xaResource = new XAResourceImpl(this); } @@ -86,11 +93,13 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic /** * Create a qpid session. */ - public void createSession() + @Override + public org.apache.qpid.transport.Session createSession() { _qpidDtxSession = getQpidConnection().createSession(0,true); - _qpidDtxSession.setSessionListener(this); _qpidDtxSession.dtxSelect(); + _qpidDtxSession.setSessionListener(this); + return _qpidDtxSession; } /** @@ -101,11 +110,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public Session getSession() throws JMSException { - if (_jmsSession == null) - { - _jmsSession = getAMQConnection().createSession(true, getAcknowledgeMode()); - } - return _jmsSession; + return this; } /** @@ -162,7 +167,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public QueueSession getQueueSession() throws JMSException { - return (QueueSession) getSession(); + return this; } // interface XATopicSession @@ -175,7 +180,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic */ public TopicSession getTopicSession() throws JMSException { - return (TopicSession) getSession(); + return this; } @Override |
