From 8f5813d5c4bc68c29cd3ed86540041d1463b30f7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 11 May 2013 11:05:32 +0000 Subject: QPID-4829 : [JMS AMQP 1.0] Sessions added to started connections are not themselves started git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1481290 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/amqp_1_0/jms/impl/ConnectionImpl.java | 21 ++++++++++++++++----- .../qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java | 15 ++++++++------- .../apache/qpid/amqp_1_0/jms/impl/SessionImpl.java | 3 +-- .../org/apache/qpid/amqp_1_0/client/Receiver.java | 5 +++++ 4 files changed, 30 insertions(+), 14 deletions(-) (limited to 'java') diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 73701889b5..4028d32e15 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -163,6 +163,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect connect(); started = true; } + try { SessionImpl session = new SessionImpl(this, acknowledgeMode); @@ -170,6 +171,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect session.setTopicSession(_isTopicConnection); _sessions.add(session); + if(_state == State.STARTED) + { + session.start(); + } + return session; } catch(JMSException e) @@ -191,9 +197,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect throw e; } } - } + + } + + void removeSession(SessionImpl session) + { + synchronized (_lock) + { + _sessions.remove(session); + } } private void reconnect(String networkHost, int port, String hostName) @@ -410,10 +424,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public boolean isStarted() { - synchronized (_lock) - { - return _state == State.STARTED; - } + return _state == State.STARTED; } void setQueueConnection(final boolean queueConnection) diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index def5ae5931..cb42c49bbc 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -188,15 +188,16 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi { checkClosed(); _messageListener = messageListener; - _session.messageListenerSet( this ); _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener() - { + { + + public void messageArrived(final Receiver receiver) + { + _session.messageArrived(MessageConsumerImpl.this); + } + }); + _session.messageListenerSet( this ); - public void messageArrived(final Receiver receiver) - { - _session.messageArrived(MessageConsumerImpl.this); - } - }); } public MessageImpl receive() throws JMSException diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index ba487cc3f6..ccd5f01909 100644 --- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -230,6 +230,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession producer.close(); } _session.close(); + _connection.removeSession(this); } } @@ -765,7 +766,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession return _txn; } - private class Dispatcher implements Runnable { @@ -816,7 +816,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession msg = consumer.receive0(0L); } - MessageListener listener = consumer._messageListener; MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage); diff --git a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java index 6996171707..a45a7f5309 100644 --- a/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java +++ b/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java @@ -566,6 +566,11 @@ public class Receiver implements DeliveryStateHandler synchronized(_endpoint.getLock()) { _messageArrivalListener = messageArrivalListener; + int prefetchSize = _prefetchQueue.size(); + for(int i = 0; i < prefetchSize; i++) + { + postPrefetchAction(); + } } } -- cgit v1.2.1