summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java468
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java86
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java3
4 files changed, 205 insertions, 356 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
index 20a0542bf6..ac4f4dbd06 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
@@ -22,8 +22,8 @@ public class JMSTestCase
msg.writeInt(123);
prod.send(msg);
- javax.jms.Message m = cons.receive();
- System.out.println(m);
+ javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+ System.out.println("Data : " + m.readInt());
}
catch(Exception e)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index b8718d687c..0f7342f1ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -17,24 +17,30 @@
*/
package org.apache.qpidity.jms;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
-
-import javax.jms.*;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
/**
* Implementation of JMS message consumer
*/
-public class MessageConsumerImpl extends MessageActor implements MessageConsumer
+public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
{
// we can receive up to 100 messages for an asynchronous listener
public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -91,9 +97,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
- private AtomicBoolean _messageReceived = new AtomicBoolean();
-
+
+ private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
+
//----- Constructors
/**
* Create a new MessageProducerImpl.
@@ -120,11 +126,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
// let's create a message part assembler
- /**
- * A Qpid message listener that pushes messages to this consumer session when this consumer is
- * asynchronous or directly to this consumer when it is synchronously accessed.
- */
- MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
if (destination instanceof Queue)
{
@@ -183,10 +186,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- getSession().getQpidSession().sync();
+ requestCredit(1);
+ requestSync();
// check for an exception
if (getSession().getCurrentException() != null)
{
@@ -266,9 +267,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().getQpidSession().messageStop(getMessageActorID());
}
_messageAsyncrhonouslyReceived = 0;
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_MESSAGE_TRANSFERRED);
+ requestCredit(MAX_MESSAGE_TRANSFERRED);
}
/**
@@ -281,7 +280,28 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receive() throws JMSException
{
- return receive(0);
+ // Check if we can get a message immediately
+ Message result;
+ result = receiveNoWait();
+
+ if(result != null)
+ {
+ return result;
+ }
+
+ try
+ {
+ // Now issue a credit and wait for the broker to send a message
+ // IMO no point doing a credit() flush() and sync() in a loop.
+ // This will only overload the broker. After the initial try we can wait
+ // for the broker to send a message when it gets one
+ requestCredit(1);
+ return (Message)_queue.take();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
/**
@@ -297,20 +317,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receive(long timeout) throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
if (timeout < 0)
{
throw new JMSException("Invalid timeout value: " + timeout);
}
+
Message result;
try
{
- result = internalReceive(timeout);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ // We shouldn't do a sync(). Bcos the timeout can happen
+ // before the sync() returns
+ return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
/**
@@ -321,138 +356,56 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receiveNoWait() throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
Message result;
try
{
- result = internalReceive(-1);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ requestSync();
+ return (Message)_queue.poll();
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
-
- // not public methods
-
- /**
- * Receive a synchronous message
- * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
- * is closed.
- * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
- * is closed)
- * <p> A timeout less than 0 returns the next message or null if one is not available.
- *
- * @param timeout The timeout value (in milliseconds)
- * @return the next message or null if one is not available.
- * @throws Exception If receiving the next message fails due to some internal error.
- */
- private Message internalReceive(long timeout) throws Exception
+
+ // not public methods
+ private void requestCredit(int units)
{
- checkNotClosed();
- Message result = null;
-
- if (_messageListener != null)
- {
- throw new javax.jms.IllegalStateException("A listener has already been set.");
- }
-
- synchronized (_incomingMessageLock)
- {
- // This indicate to the delivery thread to deliver the message to this consumer
- // as it can happens that a message is delivered after a receive operation as returned.
- _isReceiving = true;
- boolean blockingReceived = timeout == 0;
- if (!_isStopped)
- {
- // if this consumer is stopped then this will be call when starting
- requestOneMessage();
- //When sync() returns we know whether we have received a message or not.
- System.out.println("Internal receive -- Called sync()");
- getSession().getQpidSession().sync();
- System.out.println("Internal receive -- Returned from sync()");
- }
- if (_messageReceived.get() && timeout < 0)
- {
- // this is a nowait and we havent received a message then we must immediatly return
- result = null;
- }
- else
- {
- boolean messageReceived = false;
- while (!messageReceived)
- {
- long timeBeforeWait = 0;
- while (_incomingMessage == null && !_isClosed)
- {
- if (!blockingReceived)
- {
- timeBeforeWait = System.currentTimeMillis();
- }
- try
- {
- _incomingMessageLock.wait(timeout);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- if (_incomingMessage != null)
- {
- result = (Message) _incomingMessage;
- // tell the session that a message is inprocess
- getSession().preProcessMessage(_incomingMessage);
- // tell the session to acknowledge this message (if required)
- getSession().acknowledgeMessage(_incomingMessage);
- _incomingMessage.afterMessageReceive();
- messageReceived = true;
- }
- else
- {
- //now setup the new timeout
- if (!blockingReceived)
- {
- timeout = timeout - (System.currentTimeMillis() - timeBeforeWait);
- }
- if (!_isClosed)
- {
- // we need to request a new message
- requestOneMessage();
- getSession().getQpidSession().sync();
- if (_messageReceived.get() && timeout < 0)
- {
- // we are waiting for too long and we haven't received a proper message
- result = null;
- messageReceived = true;
- }
- }
- }
- }
- _incomingMessage = null;
- }
- // We now release any message received for this consumer
- _isReceiving = false;
- getSession().testQpidException();
- }
- return result;
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
}
-
- /**
- * Request a single message
- */
- private void requestOneMessage()
+
+ private void requestFlush()
+ {
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+
+ private void requestSync()
+ {
+ getSession().getQpidSession().sync();
+ }
+
+ private void checkClosed() throws JMSException
{
- if (_logger.isDebugEnabled())
+ if(_isStopped)
{
- _logger.debug("Requesting a single message");
+ throw new JMSException("Session is closed");
}
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
}
-
+
/**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message listener
@@ -475,93 +428,42 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
synchronized (_incomingMessageLock)
{
- _isStopped = false;
- if (_isReceiving)
- {
- // there is a synch call waiting for a message to be delivered
- // so tell the broker to deliver a message
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- }
+ _isStopped = false;
}
}
- /**
- * Deliver a message to this consumer.
- *
- * @param message The message delivered to this consumer.
- */
- protected synchronized void onMessage(QpidMessage message)
- {
+ public void onMessage(org.apache.qpidity.api.Message message)
+ {
try
{
- // if there is a message selector then we need to evaluate it.
- boolean messageOk = true;
- if (_messageSelector != null)
- {
- messageOk = _filter.matches((Message) message);
- }
-
- System.out.println("Received a message- onMessage in message consumer Impl");
- if (!messageOk && _preAcquire)
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ if (_messageListener == null)
{
- // this is the case for topics
- // We need to ack this message
- System.out.println("onMessage - trying to ack message");
- acknowledgeMessage(message);
- System.out.println("onMessage - acked message");
+ _queue.offer(jmsMessage);
}
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk)
+ else
{
- System.out.println("onMessage - trying to acquire message");
- messageOk = acquireMessage(message);
- System.out.println("onMessage - acquired message");
+ // I still think we don't need that additional thread in SessionImpl
+ // if the Application blocks on a message thats fine
+ // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+ notifyMessageListener(jmsMessage);
}
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+
+ public void notifyMessageListener(QpidMessage message)throws RuntimeException
+ {
+ try
+ {
+ boolean messageOk = checkPreConditions(message);
- // if this consumer is synchronous then set the current message and
- // notify the waiting thread
- if (_messageListener == null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Received a message- onMessage in message consumer Impl");
- }
- synchronized (_incomingMessageLock)
- {
- System.out.println("got incomming message lock");
- if (messageOk)
- {
- // we have received a proper message that we can deliver
- if (_isReceiving)
- {
- System.out.println("Is receiving true, setting message and notifying");
- _incomingMessage = message;
- _incomingMessageLock.notify();
- }
- else
- {
- // this message has been received after a received as returned
- // we need to release it
- releaseMessage(message);
- }
- }
- else
- {
- // oups the message did not match the selector or we did not manage to acquire it
- // If the receiver is still waiting for a message
- // then we need to request a new one from the server
- if (_isReceiving)
- {
- _incomingMessageLock.notify();
- }
- }
- }
- }
- else
+ // only deliver the message if it is valid
+ if (messageOk)
{
_messageAsyncrhonouslyReceived++;
if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
@@ -569,56 +471,93 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
resetAsynchMessageReceived();
}
- // only deliver the message if it is valid
- if (messageOk)
+
+ preApplicationProcessing(message);
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+
+ _messageListener.onMessage((Message) message);
+ }
+ catch (RuntimeException re)
{
- // This is an asynchronous message
- // tell the session that a message is in process
- getSession().preProcessMessage(message);
- // If the session is transacted we need to ack the message first
- // This is because a message is associated with its tx only when acked
- if (getSession().getTransacted())
- {
- getSession().acknowledgeMessage(message);
- }
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
- message.afterMessageReceive();
- _messageListener.onMessage((Message) message);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
- // If the session has been recovered we then need to redelivered this message
- if (getSession().isInRecovery())
- {
- releaseMessage(message);
- }
- else if (!getSession().getTransacted())
- {
- // Tell the jms Session to ack this message if required
- getSession().acknowledgeMessage(message);
- }
+ // do nothing as this message will not be redelivered
}
}
+
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
+
+ private void checkIfListenerSet() throws javax.jms.IllegalStateException
+ {
+
+ if (_messageListener != null)
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+ }
+
+ private void preApplicationProcessing(QpidMessage message)throws Exception
+ {
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ message.afterMessageReceive();
+ }
+
+ private boolean checkPreConditions(QpidMessage message)throws QpidException
+ {
+ boolean messageOk = true;
+ if (_messageSelector != null)
+ {
+ messageOk = _filter.matches((Message) message);
+ if (!messageOk)
+ {
+ System.out.println("Message not OK, releasing");
+ releaseMessage(message);
+ }
+ }
+
+ System.out.println("messageOk " + messageOk);
+ System.out.println("_preAcquire " + _preAcquire);
+
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ System.out.println("filterMessage - trying to ack message");
+ acknowledgeMessage(message);
+ System.out.println("filterMessage - acked message");
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ System.out.println("filterMessage - trying to acquire message");
+ messageOk = acquireMessage(message);
+ System.out.println("filterMessage - acquired message");
+ }
+
+ return messageOk;
+ }
/**
* Release a message
@@ -681,9 +620,4 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().testQpidException();
}
}
-
- public void notifyMessageReceived()
- {
- _messageReceived.set(true);
- }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
deleted file mode 100644
index 4dbf86a388..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.jms;
-
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.util.MessageListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session.
- * This is for guarantying that asynch messages are sequentially processed within their session.
- * <p> when used synchonously, messages are dispatched to the receiver itself.
- */
-public class QpidMessageListener implements MessageListener
-{
- /**
- * Used for debugging.
- */
- private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
-
- /**
- * This message listener consumer
- */
- MessageConsumerImpl _consumer = null;
-
- //---- constructor
- /**
- * Create a message listener wrapper for a given consumer
- *
- * @param consumer The consumer of this listener
- */
- public QpidMessageListener(MessageConsumerImpl consumer)
- {
- _consumer = consumer;
- }
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- try
- {
- // to be used with flush
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its session.
- if( _consumer.getMessageListener() != null )
- {
- _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index f08fdba373..010a43ab41 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -1301,7 +1301,8 @@ public class SessionImpl implements Session
{
try
{
- mc.onMessage(message.getMessage());
+ // mc.onMessage(message.getMessage());
+ mc.notifyMessageListener(message.getMessage());
}
catch (RuntimeException t)
{