diff options
26 files changed, 232 insertions, 137 deletions
diff --git a/qpid/java/08ExcludeList b/qpid/java/08ExcludeList index 88eb754950..1d8950dddf 100644 --- a/qpid/java/08ExcludeList +++ b/qpid/java/08ExcludeList @@ -6,3 +6,4 @@ org.apache.qpid.test.testcases.FailoverTest#* org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* +org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait diff --git a/qpid/java/08ExcludeList-nonvm b/qpid/java/08ExcludeList-nonvm index eb6c60b225..546dc01f5b 100644 --- a/qpid/java/08ExcludeList-nonvm +++ b/qpid/java/08ExcludeList-nonvm @@ -27,3 +27,4 @@ org.apache.qpid.server.security.acl.SimpleACLTest#* // Those tests are written against the 0.10 path org.apache.qpid.test.unit.message.UTF8Test#* +org.apache.qpid.client.MessageListenerTest#testSynchronousRecieveNoWait diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java index 93bb097268..1ac3e85f7a 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -42,6 +42,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { System.out.println("Message: " + xfr); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java index 4c72ce75a5..21f9c43cd2 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/Listener.java @@ -42,6 +42,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { System.out.println("Message: " + xfr); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java index aa1c9b0a41..dff49228a1 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/headers/Listener.java @@ -32,6 +32,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { String body = xfr.getBodyString(); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java index b930062813..e17d3eef9f 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/lvq/Listener.java @@ -44,6 +44,8 @@ public class Listener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { String body = xfr.getBodyString(); diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java index 5e6d3c6f69..dd9307ca84 100755 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -40,6 +40,8 @@ public class TopicListener implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { DeliveryProperties dp = xfr.getHeader().get(DeliveryProperties.class); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 29f1aec2f5..c2fb05d94e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -239,12 +239,6 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _conn.failoverPrep(); _qpidConnection.resume(); - - if (_conn.firePreResubscribe()) - { - _conn.resubscribeSessions(); - } - _conn.fireFailoverComplete(); return; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 08fd49286b..d7a54cad4c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -97,7 +97,7 @@ public class AMQQueueBrowser implements QueueBrowser return new Enumeration() { - Message _nextMessage = consumer == null ? null : consumer.receive(1000); + Message _nextMessage = consumer == null ? null : consumer.receiveBrowse(); public boolean hasMoreElements() { @@ -111,7 +111,7 @@ public class AMQQueueBrowser implements QueueBrowser try { _logger.info("QB:nextElement about to receive"); - _nextMessage = consumer.receive(1000); + _nextMessage = consumer.receiveBrowse(); _logger.info("QB:nextElement received:" + _nextMessage); } catch (JMSException e) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 733bee2d81..9012632adf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -575,12 +575,19 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) throws AMQException { + bindQueue(queueName, routingKey, arguments, exchangeName, destination, false); + } + + public void bindQueue(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, + final AMQShortString exchangeName, final AMQDestination destination, + final boolean nowait) throws AMQException + { /*new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()*/ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendQueueBind(queueName, routingKey, arguments, exchangeName, destination); + sendQueueBind(queueName, routingKey, arguments, exchangeName, destination, nowait); return null; } }, _connection).execute(); @@ -595,7 +602,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException; + final AMQShortString exchangeName, AMQDestination destination, + final boolean nowait) throws AMQException, FailoverException; /** * Closes the session. @@ -1815,6 +1823,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic void failoverPrep() { startDispatcherIfNecessary(); + syncDispatchQueue(); + } + + void syncDispatchQueue() + { final CountDownLatch signal = new CountDownLatch(1); _queue.add(new Dispatchable() { public void dispatch(AMQSession ssn) @@ -1828,7 +1841,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } catch (InterruptedException e) { - // pass + throw new RuntimeException(e); } } @@ -1859,6 +1872,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _inRecovery = inRecovery; } + boolean isStarted() + { + return _startedAtLeastOnce.get(); + } + /** * Starts the session, which ensures that it is not suspended and that its event dispatcher is running. * @@ -2281,7 +2299,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @todo Be aware of possible changes to parameter order as versions change. */ protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal) throws AMQException + { + return declareQueue(amqd, protocolHandler, noLocal, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -2296,14 +2320,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler); + sendQueueDeclare(amqd, protocolHandler, nowait); return amqd.getAMQQueueName(); } }, _connection).execute(); } - public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException; + public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2416,14 +2441,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic AMQProtocolHandler protocolHandler = getProtocolHandler(); - declareExchange(amqd, protocolHandler, false); + declareExchange(amqd, protocolHandler, nowait); - AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal()); + AMQShortString queueName = declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); // store the consumer queue name consumer.setQueuename(queueName); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) @@ -2455,11 +2480,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { - consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector()); - } - catch (JMSException e) // thrown by getMessageSelector - { - throw new AMQException(null, e.getMessage(), e); + consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector); } catch (FailoverException e) { @@ -2531,8 +2552,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic for (C consumer : consumers) { - consumer.failedOver(); + consumer.failedOverPre(); registerConsumer(consumer, true); + consumer.failedOverPost(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a8487b04e9..dc79555171 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -272,7 +272,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * @param arguments 0_8 specific */ public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, final AMQDestination destination) + final FieldTable arguments, final AMQShortString exchangeName, + final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { Map args = FiledTableSupport.convertToMap(arguments); @@ -287,9 +288,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); } - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } } @@ -501,18 +505,24 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); } - getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); // We need to sync so that we get notify of an error. // only if not immediat prefetch - if(prefetch() && (consumer.isStrated() || _immediatePrefetch)) + if(prefetch() && (isStarted() || _immediatePrefetch)) { // set the flow getQpidSession().messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); } - getQpidSession().sync(); - getCurrentException(); } /** @@ -540,14 +550,18 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic null, name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } } /** * Declare a queue with the given queueName */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -557,7 +571,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException, FailoverException { AMQShortString res; @@ -581,9 +595,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.isDurable() ? Option.DURABLE : Option.NONE, !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); // passive --> false - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } return res; } @@ -609,7 +626,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { for (BasicMessageConsumer consumer : _consumers.values()) { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag())); + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); } } else @@ -625,17 +643,20 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic if (consumer.getMessageListener() != null) { getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } else { getQpidSession() .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch()); + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); } getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF); + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); } catch (Exception e) { @@ -700,6 +721,19 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void opened(Session ssn) {} + public void resumed(Session ssn) + { + _qpidConnection = ssn.getConnection(); + try + { + resubscribe(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + public void message(Session ssn, MessageTransfer xfr) { messageReceived(new UnprocessedMessage_0_10(xfr)); @@ -716,7 +750,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void closed(Session ssn) {} protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal) + final boolean noLocal, final boolean nowait) throws AMQException { /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ @@ -736,34 +770,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); } }, _connection).execute(); } - - void start() throws AMQException - { - super.start(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.start(); - } - } - - - void stop() throws AMQException - { - super.stop(); - for(BasicMessageConsumer c: _consumers.values()) - { - c.stop(); - } - } - - - - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 6451ae60be..356d0dae7b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -106,7 +106,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest) throws AMQException, FailoverException + final AMQShortString exchangeName, final AMQDestination dest, + final boolean nowait) throws AMQException, FailoverException { getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody (getTicket(),queueName,exchangeName,routingKey,false,arguments). @@ -306,7 +307,8 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) throws AMQException, FailoverException { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 76422c6297..2bb443a090 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -441,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa o = _synchronousQueue.take(); } return o; - } + } + + abstract Message receiveBrowse() throws JMSException; public Message receiveNoWait() throws JMSException { @@ -1037,23 +1039,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue.clear(); } - public void start() - { - // do nothing as this is a 0_10 feature - } - - - public void stop() - { - // do nothing as this is a 0_10 feature - } - - public boolean isStrated() - { - // do nothing as this is a 0_10 feature - return false; - } - public AMQShortString getQueuename() { return _queuename; @@ -1070,10 +1055,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } /** to be called when a failover has occured */ - public void failedOver() + public void failedOverPre() { clearReceiveQueue(); // TGM FIXME: think this should just be removed // clearUnackedMessages(); } + + public void failedOverPost() {} + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 7d535643c0..9db2007e1a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -31,6 +31,7 @@ import org.apache.qpid.filter.JMSSelectorFilter; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageListener; import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; @@ -148,7 +149,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (isMessageListenerSet() && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } _logger.debug("messageOk, trying to notify"); super.notifyMessage(jmsMessage); @@ -246,7 +248,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if(! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } // now we need to acquire this message if needed @@ -335,7 +338,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (messageListener != null && ! getSession().prefetch()) { _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } if (messageListener != null && !_synchronousQueue.isEmpty()) { @@ -349,26 +353,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - public boolean isStrated() + public void failedOverPost() { - return _isStarted; - } - - public void start() - { - _isStarted = true; - if (_syncReceive.get()) + if (_0_10session.isStarted() && _syncReceive.get()) { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); } } - public void stop() - { - _isStarted = false; - } - /** * When messages are not prefetched we need to request a message from the * broker. @@ -380,16 +374,35 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM */ public Object getMessageFromQueue(long l) throws InterruptedException { - if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) - { - _0_10session.getQpidSession().messageFlow(getConsumerTagString(), - MessageCreditUnit.MESSAGE, 1); - } if (! getSession().prefetch()) { _syncReceive.set(true); } + if (_0_10session.isStarted() && ! getSession().prefetch() && _synchronousQueue.isEmpty()) + { + _0_10session.getQpidSession().messageFlow(getConsumerTagString(), + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } Object o = super.getMessageFromQueue(l); + if (o == null) + { + _0_10session.getQpidSession().messageFlush + (getConsumerTagString(), Option.UNRELIABLE); + _0_10session.getQpidSession().sync(); + if (getSession().prefetch()) + { + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.BYTE, + 0xFFFFFFFF, Option.UNRELIABLE); + _0_10session.getQpidSession().messageFlow + (getConsumerTagString(), MessageCreditUnit.MESSAGE, + _0_10session.getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + _0_10session.syncDispatchQueue(); + o = super.getMessageFromQueue(-1); + } if (! getSession().prefetch()) { _syncReceive.set(false); @@ -406,4 +419,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } + Message receiveBrowse() throws JMSException + { + return receiveNoWait(); + } + } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 494a8fb43d..db85ac37d0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,6 +22,7 @@ package org.apache.qpid.client; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import javax.jms.Message; import org.apache.qpid.AMQException; import org.apache.qpid.QpidException; @@ -38,9 +39,9 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe protected final Logger _logger = LoggerFactory.getLogger(getClass()); protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, - String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException + String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, + AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, @@ -73,13 +74,18 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe } } - public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception - { + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception + { return _messageFactory.createMessage(messageFrame.getDeliveryTag(), - messageFrame.isRedelivered(), messageFrame.getExchange(), - messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + messageFrame.isRedelivered(), messageFrame.getExchange(), + messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies()); + + } + Message receiveBrowse() throws JMSException + { + return receive(1000); } } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index a881f6a822..566a222897 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -44,7 +44,9 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination destination) throws AMQException, FailoverException + public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, + AMQShortString exchangeName, AMQDestination destination, + boolean nowait) throws AMQException, FailoverException { } @@ -129,7 +131,8 @@ public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException, FailoverException + public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, + boolean nowait) throws AMQException, FailoverException { } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java index 3491af8cd2..5b2db10613 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java @@ -37,6 +37,8 @@ class ToyClient implements SessionListener { public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void exception(Session ssn, SessionException exc) { exc.printStackTrace(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java index c1031c9a1c..0e969464ab 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -37,6 +37,8 @@ public class Echo implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { int id = xfr.getId(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 4079097f96..9d2686a6f7 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -53,13 +53,15 @@ public class Session extends SessionInvoker private static final Logger log = Logger.get(Session.class); - enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED } + enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED } class DefaultSessionListener implements SessionListener { public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { log.info("message: %s", xfr); @@ -107,6 +109,8 @@ public class Session extends SessionInvoker private volatile boolean flowControl = false; private Semaphore credit = new Semaphore(0); + private Thread resumer = null; + Session(Connection connection, Binary name, long expiry) { this.connection = connection; @@ -234,15 +238,21 @@ public class Session extends SessionInvoker for (int i = maxComplete + 1; lt(i, commandsOut); i++) { Method m = commands[mod(i, commands.length)]; - if (m != null) + if (m == null) { - sessionCommandPoint(m.getId(), 0); - send(m); + m = new ExecutionSync(); + m.setId(i); } + sessionCommandPoint(m.getId(), 0); + send(m); } sessionCommandPoint(commandsOut, 0); sessionFlush(COMPLETED); + resumer = Thread.currentThread(); + state = RESUMING; + listener.resumed(this); + resumer = null; } } @@ -387,7 +397,7 @@ public class Session extends SessionInvoker synchronized (commands) { - if (state == DETACHED) + if (state == DETACHED || state == CLOSING) { return; } @@ -499,10 +509,14 @@ public class Session extends SessionInvoker if (state != OPEN && state != CLOSED) { - Waiter w = new Waiter(commands, timeout); - while (w.hasTime() && (state != OPEN && state != CLOSED)) + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) { - w.await(); + Waiter w = new Waiter(commands, timeout); + while (w.hasTime() && (state != OPEN && state != CLOSED)) + { + w.await(); + } } } @@ -510,6 +524,14 @@ public class Session extends SessionInvoker { case OPEN: break; + case RESUMING: + Thread current = Thread.currentThread(); + if (!current.equals(resumer)) + { + throw new SessionException + ("timed out waiting for resume to finish"); + } + break; case CLOSED: throw new SessionClosedException(); default: @@ -527,7 +549,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commands, timeout); while (w.hasTime() && isFull(next)) { - if (state == OPEN) + if (state == OPEN || state == RESUMING) { try { @@ -560,7 +582,7 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - if (expiry > 0) + if (expiry > 0 && !m.isUnreliable()) { commands[mod(next, commands.length)] = m; commandBytes += m.getBodySize(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java index 63690177f9..eb650eb9ed 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java @@ -31,6 +31,8 @@ public interface SessionListener void opened(Session session); + void resumed(Session session); + void message(Session ssn, MessageTransfer xfr); void exception(Session session, SessionException exception); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java index 622993effb..88870284f6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java @@ -87,6 +87,8 @@ public class Sink implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { count++; diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index dca6264367..3d634bfb70 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -74,6 +74,8 @@ public class ConnectionTest extends TestCase implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(final Session ssn, MessageTransfer xfr) { if (queue) @@ -277,6 +279,7 @@ public class ConnectionTest extends TestCase implements SessionListener class TestSessionListener implements SessionListener { public void opened(Session s) {} + public void resumed(Session s) {} public void exception(Session s, SessionException e) {} public void message(Session s, MessageTransfer xfr) { diff --git a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java index a12993d40e..ee41beaf50 100644 --- a/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java +++ b/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java @@ -86,6 +86,8 @@ public class QpidService implements SessionListener public void opened(Session ssn) {} + public void resumed(Session ssn) {} + public void message(Session ssn, MessageTransfer xfr) { MessagePartListenerAdapter l = _listeners.get(xfr.getDestination()); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java index 4c1d5ee9c1..ffec6c7a29 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java @@ -106,6 +106,14 @@ public class MessageListenerTest extends QpidTestCase implements MessageListener } } + public void testSynchronousRecieveNoWait() throws Exception + { + for (int msg = 0; msg < MSG_COUNT; msg++) + { + assertTrue(_consumer.receiveNoWait() != null); + } + } + public void testAsynchronousRecieve() throws Exception { _consumer.setMessageListener(this); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java index cd921f0971..4c4ef0320c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/failover/FailoverTest.java @@ -230,12 +230,6 @@ public class FailoverTest extends FailoverBaseCase implements ConnectionListener causeFailure(DEFAULT_FAILOVER_TIME); - if (!CLUSTERED) - { - msg = consumer.receive(500); - assertNull("Should not have received message from new broker!", msg); - } - // Check that you produce and consume the rest of messages. _logger.debug("=================="); _logger.debug("Sending " + (totalMessages-toProduce) + " messages"); diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java index 4bba7b113d..9770adceb0 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java @@ -696,6 +696,8 @@ public class QpidBench public void opened(org.apache.qpid.transport.Session ssn) {} + public void resumed(org.apache.qpid.transport.Session ssn) {} + public void exception(org.apache.qpid.transport.Session ssn, SessionException exc) { |
