summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 11:05:32 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 11:05:32 +0000
commit14d3de969b10a86fa8c1531ea22db0d9143d1b3e (patch)
treeb132b537da65f351bdc16a0a2cace6fb895600ff /qpid/java
parent448ef5e4a964ba15b05b0963f5fb6deb0a64908c (diff)
downloadqpid-python-14d3de969b10a86fa8c1531ea22db0d9143d1b3e.tar.gz
QPID-4829 : [JMS AMQP 1.0] Sessions added to started connections are not themselves started
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1481290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java21
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java15
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java3
-rw-r--r--qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java5
4 files changed, 30 insertions, 14 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 73701889b5..4028d32e15 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -163,6 +163,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
connect();
started = true;
}
+
try
{
SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -170,6 +171,11 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
session.setTopicSession(_isTopicConnection);
_sessions.add(session);
+ if(_state == State.STARTED)
+ {
+ session.start();
+ }
+
return session;
}
catch(JMSException e)
@@ -191,9 +197,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
throw e;
}
}
-
}
+
+ }
+
+ void removeSession(SessionImpl session)
+ {
+ synchronized (_lock)
+ {
+ _sessions.remove(session);
+ }
}
private void reconnect(String networkHost, int port, String hostName)
@@ -410,10 +424,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
public boolean isStarted()
{
- synchronized (_lock)
- {
- return _state == State.STARTED;
- }
+ return _state == State.STARTED;
}
void setQueueConnection(final boolean queueConnection)
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index def5ae5931..cb42c49bbc 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -188,15 +188,16 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
checkClosed();
_messageListener = messageListener;
- _session.messageListenerSet( this );
_receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
- {
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ _session.messageListenerSet( this );
- public void messageArrived(final Receiver receiver)
- {
- _session.messageArrived(MessageConsumerImpl.this);
- }
- });
}
public MessageImpl receive() throws JMSException
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index ba487cc3f6..ccd5f01909 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -230,6 +230,7 @@ public class SessionImpl implements Session, QueueSession, TopicSession
producer.close();
}
_session.close();
+ _connection.removeSession(this);
}
}
@@ -765,7 +766,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
return _txn;
}
-
private class Dispatcher implements Runnable
{
@@ -816,7 +816,6 @@ public class SessionImpl implements Session, QueueSession, TopicSession
msg = consumer.receive0(0L);
}
-
MessageListener listener = consumer._messageListener;
MessageImpl message = consumer.createJMSMessage(msg, recoveredMessage);
diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
index 6996171707..a45a7f5309 100644
--- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
+++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
@@ -566,6 +566,11 @@ public class Receiver implements DeliveryStateHandler
synchronized(_endpoint.getLock())
{
_messageArrivalListener = messageArrivalListener;
+ int prefetchSize = _prefetchQueue.size();
+ for(int i = 0; i < prefetchSize; i++)
+ {
+ postPrefetchAction();
+ }
}
}