summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/08ExcludeList1
-rw-r--r--qpid/java/08ExcludeList-nonvm1
-rwxr-xr-xqpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java2
-rwxr-xr-xqpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java2
-rwxr-xr-xqpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java52
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java101
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java64
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java20
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java42
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java2
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java3
-rw-r--r--qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java6
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java2
26 files changed, 232 insertions, 137 deletions
diff --git a/qpid/java/08ExcludeList b/qpid/java/08ExcludeList
index 88eb754950..1d8950dddf 100644
--- a/qpid/java/08ExcludeList
+++ b/qpid/java/08ExcludeList
@@ -6,3 +6,4 @@ org.apache.qpid.test.testcases.FailoverTest#*
org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
+org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
diff --git a/qpid/java/08ExcludeList-nonvm b/qpid/java/08ExcludeList-nonvm
index eb6c60b225..546dc01f5b 100644
--- a/qpid/java/08ExcludeList-nonvm
+++ b/qpid/java/08ExcludeList-nonvm
@@ -27,3 +27,4 @@ org.apache.qpid.server.security.acl.SimpleACLTest#*
// Those tests are written against the 0.10 path
org.apache.qpid.test.unit.message.UTF8Test#*
+org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
index 93bb097268..1ac3e85f7a 100755
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
@@ -42,6 +42,8 @@ public class Listener implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("Message: " + xfr);
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
index 4c72ce75a5..21f9c43cd2 100755
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java
@@ -42,6 +42,8 @@ public class Listener implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("Message: " + xfr);
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java
index aa1c9b0a41..dff49228a1 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java
@@ -32,6 +32,8 @@ public class Listener implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
String body = xfr.getBodyString();
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java
index b930062813..e17d3eef9f 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java
@@ -44,6 +44,8 @@ public class Listener implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
String body = xfr.getBodyString();
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
index 5e6d3c6f69..dd9307ca84 100755
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
@@ -40,6 +40,8 @@ public class TopicListener implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 29f1aec2f5..c2fb05d94e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -239,12 +239,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_conn.failoverPrep();
_qpidConnection.resume();
-
- if (_conn.firePreResubscribe())
- {
- _conn.resubscribeSessions();
- }
-
_conn.fireFailoverComplete();
return;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 08fd49286b..d7a54cad4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -97,7 +97,7 @@ public class AMQQueueBrowser implements QueueBrowser
return new Enumeration()
{
- Message _nextMessage = consumer == null ? null : consumer.receive(1000);
+ Message _nextMessage = consumer == null ? null : consumer.receiveBrowse();
public boolean hasMoreElements()
{
@@ -111,7 +111,7 @@ public class AMQQueueBrowser implements QueueBrowser
try
{
_logger.info("QB:nextElement about to receive");
- _nextMessage = consumer.receive(1000);
+ _nextMessage = consumer.receiveBrowse();
_logger.info("QB:nextElement received:" + _nextMessage);
}
catch (JMSException e)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 733bee2d81..9012632adf 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination destination) throws AMQException
{
+ bindQueue(queueName, routingKey, arguments, exchangeName, destination, false);
+ }
+
+ public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+ final AMQShortString exchangeName, final AMQDestination destination,
+ final boolean nowait) throws AMQException
+ {
/*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/
new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendQueueBind(queueName, routingKey, arguments, exchangeName, destination);
+ sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait);
return null;
}
}, _connection).execute();
@@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException;
+ final AMQShortString exchangeName, AMQDestination destination,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Closes the session.
@@ -1815,6 +1823,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
void failoverPrep()
{
startDispatcherIfNecessary();
+ syncDispatchQueue();
+ }
+
+ void syncDispatchQueue()
+ {
final CountDownLatch signal = new CountDownLatch(1);
_queue.add(new Dispatchable() {
public void dispatch(AMQSession ssn)
@@ -1828,7 +1841,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
catch (InterruptedException e)
{
- // pass
+ throw new RuntimeException(e);
}
}
@@ -1859,6 +1872,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_inRecovery = inRecovery;
}
+ boolean isStarted()
+ {
+ return _startedAtLeastOnce.get();
+ }
+
/**
* Starts the session, which ensures that it is not suspended and that its event dispatcher is running.
*
@@ -2281,7 +2299,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @todo Be aware of possible changes to parameter order as versions change.
*/
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal) throws AMQException
+ {
+ return declareQueue(amqd, protocolHandler, noLocal, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2296,14 +2320,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler);
+ sendQueueDeclare(amqd, protocolHandler, nowait);
return amqd.getAMQQueueName();
}
}, _connection).execute();
}
- public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException;
+ public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2416,14 +2441,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler, false);
+ declareExchange(amqd, protocolHandler, nowait);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal());
+ AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
// store the consumer queue name
consumer.setQueuename(queueName);
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
@@ -2455,11 +2480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
- }
- catch (JMSException e) // thrown by getMessageSelector
- {
- throw new AMQException(null, e.getMessage(), e);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
}
catch (FailoverException e)
{
@@ -2531,8 +2552,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
for (C consumer : consumers)
{
- consumer.failedOver();
+ consumer.failedOverPre();
registerConsumer(consumer, true);
+ consumer.failedOverPost();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a8487b04e9..dc79555171 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -272,7 +272,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* @param arguments 0_8 specific
*/
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination)
+ final FieldTable arguments, final AMQShortString exchangeName,
+ final AMQDestination destination, final boolean nowait)
throws AMQException, FailoverException
{
Map args = FiledTableSupport.convertToMap(arguments);
@@ -287,9 +288,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
}
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
@@ -501,18 +505,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
}
- getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
+ if(prefetch() && (isStarted() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumerTag,
MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
}
- getQpidSession().sync();
- getCurrentException();
}
/**
@@ -540,14 +550,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
null,
name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
// We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ getQpidSession().sync();
+ getCurrentException();
+ }
}
/**
* Declare a queue with the given queueName
*/
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -557,7 +571,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException, FailoverException
{
AMQShortString res;
@@ -581,9 +595,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.isDurable() ? Option.DURABLE : Option.NONE,
!amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
// passive --> false
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
+ if (!nowait)
+ {
+ // We need to sync so that we get notify of an error.
+ getQpidSession().sync();
+ getCurrentException();
+ }
return res;
}
@@ -609,7 +626,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
for (BasicMessageConsumer consumer : _consumers.values())
{
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
+ getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+ Option.UNRELIABLE);
}
}
else
@@ -625,17 +643,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
if (consumer.getMessageListener() != null)
{
getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
else
{
getQpidSession()
.messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch());
+ getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
}
getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
+ .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
+ Option.UNRELIABLE);
}
catch (Exception e)
{
@@ -700,6 +721,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void opened(Session ssn) {}
+ public void resumed(Session ssn)
+ {
+ _qpidConnection = ssn.getConnection();
+ try
+ {
+ resubscribe();
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public void message(Session ssn, MessageTransfer xfr)
{
messageReceived(new UnprocessedMessage_0_10(xfr));
@@ -716,7 +750,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void closed(Session ssn) {}
protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal)
+ final boolean noLocal, final boolean nowait)
throws AMQException
{
/*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -736,34 +770,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
}
}, _connection).execute();
}
-
- void start() throws AMQException
- {
- super.start();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.start();
- }
- }
-
-
- void stop() throws AMQException
- {
- super.stop();
- for(BasicMessageConsumer c: _consumers.values())
- {
- c.stop();
- }
- }
-
-
-
-
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 6451ae60be..356d0dae7b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -106,7 +106,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException
+ final AMQShortString exchangeName, final AMQDestination dest,
+ final boolean nowait) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
@@ -306,7 +307,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ final boolean nowait) throws AMQException, FailoverException
{
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 76422c6297..2bb443a090 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -441,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
o = _synchronousQueue.take();
}
return o;
- }
+ }
+
+ abstract Message receiveBrowse() throws JMSException;
public Message receiveNoWait() throws JMSException
{
@@ -1037,23 +1039,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_synchronousQueue.clear();
}
- public void start()
- {
- // do nothing as this is a 0_10 feature
- }
-
-
- public void stop()
- {
- // do nothing as this is a 0_10 feature
- }
-
- public boolean isStrated()
- {
- // do nothing as this is a 0_10 feature
- return false;
- }
-
public AMQShortString getQueuename()
{
return _queuename;
@@ -1070,10 +1055,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
/** to be called when a failover has occured */
- public void failedOver()
+ public void failedOverPre()
{
clearReceiveQueue();
// TGM FIXME: think this should just be removed
// clearUnackedMessages();
}
+
+ public void failedOverPost() {}
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 7d535643c0..9db2007e1a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
@@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
// now we need to acquire this message if needed
@@ -335,7 +338,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -349,26 +353,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- public boolean isStrated()
+ public void failedOverPost()
{
- return _isStarted;
- }
-
- public void start()
- {
- _isStarted = true;
- if (_syncReceive.get())
+ if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
}
}
- public void stop()
- {
- _isStarted = false;
- }
-
/**
* When messages are not prefetched we need to request a message from the
* broker.
@@ -380,16 +374,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1);
- }
if (! getSession().prefetch())
{
_syncReceive.set(true);
}
+ if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
Object o = super.getMessageFromQueue(l);
+ if (o == null)
+ {
+ _0_10session.getQpidSession().messageFlush
+ (getConsumerTagString(), Option.UNRELIABLE);
+ _0_10session.getQpidSession().sync();
+ if (getSession().prefetch())
+ {
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.BYTE,
+ 0xFFFFFFFF, Option.UNRELIABLE);
+ _0_10session.getQpidSession().messageFlow
+ (getConsumerTagString(), MessageCreditUnit.MESSAGE,
+ _0_10session.getAMQConnection().getMaxPrefetch(),
+ Option.UNRELIABLE);
+ }
+ _0_10session.syncDispatchQueue();
+ o = super.getMessageFromQueue(-1);
+ }
if (! getSession().prefetch())
{
_syncReceive.set(false);
@@ -406,4 +419,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
+ Message receiveBrowse() throws JMSException
+ {
+ return receiveNoWait();
+ }
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 494a8fb43d..db85ac37d0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.Message;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
@@ -38,9 +39,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected final Logger _logger = LoggerFactory.getLogger(getClass());
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
- String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
@@ -73,13 +74,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
}
}
- public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
- {
+ public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
+ {
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
- messageFrame.isRedelivered(), messageFrame.getExchange(),
- messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+ messageFrame.isRedelivered(), messageFrame.getExchange(),
+ messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies());
+
+ }
+ Message receiveBrowse() throws JMSException
+ {
+ return receive(1000);
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index a881f6a822..566a222897 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -44,7 +44,9 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
- public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException
+ public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments,
+ AMQShortString exchangeName, AMQDestination destination,
+ boolean nowait) throws AMQException, FailoverException
{
}
@@ -129,7 +131,8 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
- public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+ public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
+ boolean nowait) throws AMQException, FailoverException
{
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
index 3491af8cd2..5b2db10613 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
@@ -37,6 +37,8 @@ class ToyClient implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void exception(Session ssn, SessionException exc)
{
exc.printStackTrace();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index c1031c9a1c..0e969464ab 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -37,6 +37,8 @@ public class Echo implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
int id = xfr.getId();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 4079097f96..9d2686a6f7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -53,13 +53,15 @@ public class Session extends SessionInvoker
private static final Logger log = Logger.get(Session.class);
- enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+ enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
class DefaultSessionListener implements SessionListener
{
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
log.info("message: %s", xfr);
@@ -107,6 +109,8 @@ public class Session extends SessionInvoker
private volatile boolean flowControl = false;
private Semaphore credit = new Semaphore(0);
+ private Thread resumer = null;
+
Session(Connection connection, Binary name, long expiry)
{
this.connection = connection;
@@ -234,15 +238,21 @@ public class Session extends SessionInvoker
for (int i = maxComplete + 1; lt(i, commandsOut); i++)
{
Method m = commands[mod(i, commands.length)];
- if (m != null)
+ if (m == null)
{
- sessionCommandPoint(m.getId(), 0);
- send(m);
+ m = new ExecutionSync();
+ m.setId(i);
}
+ sessionCommandPoint(m.getId(), 0);
+ send(m);
}
sessionCommandPoint(commandsOut, 0);
sessionFlush(COMPLETED);
+ resumer = Thread.currentThread();
+ state = RESUMING;
+ listener.resumed(this);
+ resumer = null;
}
}
@@ -387,7 +397,7 @@ public class Session extends SessionInvoker
synchronized (commands)
{
- if (state == DETACHED)
+ if (state == DETACHED || state == CLOSING)
{
return;
}
@@ -499,10 +509,14 @@ public class Session extends SessionInvoker
if (state != OPEN && state != CLOSED)
{
- Waiter w = new Waiter(commands, timeout);
- while (w.hasTime() && (state != OPEN && state != CLOSED))
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
{
- w.await();
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
}
}
@@ -510,6 +524,14 @@ public class Session extends SessionInvoker
{
case OPEN:
break;
+ case RESUMING:
+ Thread current = Thread.currentThread();
+ if (!current.equals(resumer))
+ {
+ throw new SessionException
+ ("timed out waiting for resume to finish");
+ }
+ break;
case CLOSED:
throw new SessionClosedException();
default:
@@ -527,7 +549,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commands, timeout);
while (w.hasTime() && isFull(next))
{
- if (state == OPEN)
+ if (state == OPEN || state == RESUMING)
{
try
{
@@ -560,7 +582,7 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
- if (expiry > 0)
+ if (expiry > 0 && !m.isUnreliable())
{
commands[mod(next, commands.length)] = m;
commandBytes += m.getBodySize();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
index 63690177f9..eb650eb9ed 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
@@ -31,6 +31,8 @@ public interface SessionListener
void opened(Session session);
+ void resumed(Session session);
+
void message(Session ssn, MessageTransfer xfr);
void exception(Session session, SessionException exception);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
index 622993effb..88870284f6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
@@ -87,6 +87,8 @@ public class Sink implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
count++;
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index dca6264367..3d634bfb70 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -74,6 +74,8 @@ public class ConnectionTest extends TestCase implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(final Session ssn, MessageTransfer xfr)
{
if (queue)
@@ -277,6 +279,7 @@ public class ConnectionTest extends TestCase implements SessionListener
class TestSessionListener implements SessionListener
{
public void opened(Session s) {}
+ public void resumed(Session s) {}
public void exception(Session s, SessionException e) {}
public void message(Session s, MessageTransfer xfr)
{
diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
index a12993d40e..ee41beaf50 100644
--- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
+++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
@@ -86,6 +86,8 @@ public class QpidService implements SessionListener
public void opened(Session ssn) {}
+ public void resumed(Session ssn) {}
+
public void message(Session ssn, MessageTransfer xfr)
{
MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
index 4c1d5ee9c1..ffec6c7a29 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
@@ -106,6 +106,14 @@ public class MessageListenerTest extends QpidTestCase implements MessageListener
}
}
+ public void testSynchronousRecieveNoWait() throws Exception
+ {
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ assertTrue(_consumer.receiveNoWait() != null);
+ }
+ }
+
public void testAsynchronousRecieve() throws Exception
{
_consumer.setMessageListener(this);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
index cd921f0971..4c4ef0320c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java
@@ -230,12 +230,6 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener
causeFailure(DEFAULT_FAILOVER_TIME);
- if (!CLUSTERED)
- {
- msg = consumer.receive(500);
- assertNull("Should not have received message from new broker!", msg);
- }
-
// Check that you produce and consume the rest of messages.
_logger.debug("==================");
_logger.debug("Sending " + (totalMessages-toProduce) + " messages");
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
index 4bba7b113d..9770adceb0 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
@@ -696,6 +696,8 @@ public class QpidBench
public void opened(org.apache.qpid.transport.Session ssn) {}
+ public void resumed(org.apache.qpid.transport.Session ssn) {}
+
public void exception(org.apache.qpid.transport.Session ssn,
SessionException exc)
{