From 9ebcf9839197cafe78beb8dfa14b803bd78f5a5e Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 21 Aug 2007 23:16:44 +0000 Subject: I added support in the JMS layer to figure out if it received any messages after calling flush git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568321 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Session.java | 20 +++++------------ .../apache/qpidity/client/impl/ClientSession.java | 20 +++++------------ .../qpidity/client/impl/ClientSessionDelegate.java | 10 --------- .../apache/qpidity/jms/MessageConsumerImpl.java | 26 +++++++++++++++++----- .../apache/qpidity/jms/QpidMessageListener.java | 3 +++ 5 files changed, 33 insertions(+), 46 deletions(-) (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 438f8f0605..d8be937e46 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -233,7 +233,7 @@ public interface Session * published them. *
  • {@link Option#EXCLUSIVE}:

    Request exclusive subscription access, meaning only this * ubscription can access the queue. - *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an “empty” option. + *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an �empty� option. * * * @param queue The queue this receiver is receiving messages from. @@ -423,16 +423,6 @@ public interface Session */ public void messageRelease(RangeSet ranges); - - /** - * Returns the number of message received for this session since - * {@link Session#messageFlow} has bee invoked. - * - * @return The number of message received for this session since - * {@link Session#messageFlow} has bee invoked. - */ - public int messagesReceived(); - // ----------------------------------------------- // Local transaction methods // ---------------------------------------------- @@ -478,7 +468,7 @@ public interface Session * declaring connection closes. *

  • {@link Option#PASSIVE}:

    If set, the server will not create the queue. * This field allows the client to assert the presence of a queue without modifying the server state. - *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an “empty” option. + *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an �empty� option. * *

    In the absence of a particular option, the defaul value is false for each option * @@ -540,7 +530,7 @@ public interface Session *

  • {@link Option#IF_EMPTY}:

    If set, the server will only delete the queue if it has no messages. *

  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the queue if it has no consumers. * If the queue has consumers the server does does not delete it but raises a channel exception instead. - *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an “empty” option. + *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an �empty� option. * *

    *

    @@ -569,7 +559,7 @@ public interface Session * exchanges) are purged if/when a server restarts. *

  • {@link Option#PASSIVE}:

    If set, the server will not create the exchange. * The client can use this to check whether an exchange exists without modifying the server state. - *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an “empty” option. + *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an �empty� option. * *

    In the absence of a particular option, the defaul value is false for each option

    * @@ -596,7 +586,7 @@ public interface Session *
  • {@link Option#IF_UNUSED}:

    If set, the server will only delete the exchange if it has no queue bindings. If the * exchange has queue bindings the server does not delete it but raises a channel exception * instead. - *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an “empty” option. + *

  • {@link Option#NO_OPTION}:

    Has no effect as it represents an �empty� option. * *

    In the absence of a particular option, the defaul value is false for each option * diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java index 6c2d4d477f..9793d3ad8b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -22,19 +22,7 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa private ExceptionListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; - - - public int messagesReceived() - { - // TODO - return 1; - } - - @Override public void sessionClose() - { - super.sessionClose(); - } - + public void messageAcknowledge(RangeSet ranges) { for (Range range : ranges) @@ -97,6 +85,10 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa public void setMessageListener(String destination, MessagePartListener listener) { + if (listener == null) + { + throw new IllegalArgumentException("Cannot set message listener to null"); + } _messageListeners.put(destination, listener); } @@ -105,8 +97,6 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa _exceptionListner = exceptionListner; } - // ugly but nessacery - void setAccquiredMessages(RangeSet acquiredMessages) { _acquiredMessages = acquiredMessages; diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java index dc72f1f975..17646b631e 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -9,7 +9,6 @@ import org.apache.qpidity.MessageReject; import org.apache.qpidity.MessageTransfer; import org.apache.qpidity.QpidException; import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; import org.apache.qpidity.Session; import org.apache.qpidity.SessionClosed; import org.apache.qpidity.SessionDelegate; @@ -54,17 +53,8 @@ public class ClientSessionDelegate extends SessionDelegate _currentTransfer = currentTransfer; _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); _currentMessageListener.messageTransfer(currentTransfer.getId()); - - //a better way is to tell the broker to stop the transfer - if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) - { - RangeSet transfers = new RangeSet(); - transfers.add(_currentTransfer.getId()); - session.messageRelease(transfers); - } } - @Override public void messageReject(Session session, MessageReject struct) { for (Range range : struct.getTransfers()) diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index e726288f89..8bab833ddf 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -17,6 +17,8 @@ */ package org.apache.qpidity.jms; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.RangeSet; import org.apache.qpidity.QpidException; @@ -94,6 +96,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer * Nether exceed MAX_MESSAGE_TRANSFERRED */ private int _messageAsyncrhonouslyReceived = 0; + + private AtomicBoolean _messageReceived = new AtomicBoolean(); //----- Constructors /** @@ -354,7 +358,6 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // This indicate to the delivery thread to deliver the message to this consumer // as it can happens that a message is delivered after a receive operation as returned. _isReceiving = true; - int received = 0; if (!_isStopped) { // if this consumer is stopped then this will be call when starting @@ -362,10 +365,13 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); + _messageReceived.set(false); + + //When sync() returns we know whether we have received a message or not. getSession().getQpidSession().sync(); - received = getSession().getQpidSession().messagesReceived(); + //received = getSession().getQpidSession().messagesReceived(); } - if (received == 0 && timeout < 0) + if (_messageReceived.get() && timeout < 0) { // this is a nowait and we havent received a message then we must immediatly return result = null; @@ -501,9 +507,12 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); getSession().getQpidSession().messageFlush(getMessageActorID()); - getSession().getQpidSession().sync(); - int received = getSession().getQpidSession().messagesReceived(); - if (received == 0 && _isNoWaitIsReceiving) + _messageReceived.set(false); + + // When sync() returns we know whether we have received a message or not. + getSession().getQpidSession().sync(); + + if (_messageReceived.get() && _isNoWaitIsReceiving) { // Right a message nowait is waiting for a message // but no one can be delivered it then need to return @@ -632,4 +641,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer getSession().testQpidException(); } } + + public void notifyMessageReceived() + { + _messageReceived.set(true); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java index ada5d048e1..4dbf86a388 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java @@ -62,6 +62,9 @@ public class QpidMessageListener implements MessageListener { try { + // to be used with flush + _consumer.notifyMessageReceived(); + //convert this message into a JMS one QpidMessage jmsMessage = MessageFactory.getQpidMessage(message); // if consumer is asynchronous then send this message to its session. -- cgit v1.2.1