summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-25 15:31:30 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-25 15:31:30 +0000
commiteeca08fbcf9404cb573b2f015dbfe402025acaae (patch)
tree3c818117256da33479767656a135f190a5518630 /java
parentaa0c3083fcb4ddb93d3ef39452005715673f1c8d (diff)
downloadqpid-python-eeca08fbcf9404cb573b2f015dbfe402025acaae.tar.gz
I provided a fixed to the deadlock issue in MessageConsumerImpl.
Here is the deadlock issue --------------------------- The internal receive thread acquires the _incomingMessageLock and blocks on sync() The MINA thread gets on to onMessage() and blocks while trying to acquire the incomingMessageLock Since the MINA thread doesn't return it can't process the execution.complete() sent by the broker. Since the execution.complete doesn't get processed, the sync() doesn't return. Hence the deadlock. Solution ---------- I rewrote the receive logic using a LinkedBlockingQueue and leveraging the application thread that calls receive methods git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@569688 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java4
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java468
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java86
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java3
4 files changed, 205 insertions, 356 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
index 20a0542bf6..ac4f4dbd06 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
@@ -22,8 +22,8 @@ public class JMSTestCase
msg.writeInt(123);
prod.send(msg);
- javax.jms.Message m = cons.receive();
- System.out.println(m);
+ javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+ System.out.println("Data : " + m.readInt());
}
catch(Exception e)
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index b8718d687c..0f7342f1ab 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -17,24 +17,30 @@
*/
package org.apache.qpidity.jms;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
-
-import javax.jms.*;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
/**
* Implementation of JMS message consumer
*/
-public class MessageConsumerImpl extends MessageActor implements MessageConsumer
+public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
{
// we can receive up to 100 messages for an asynchronous listener
public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -91,9 +97,9 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
- private AtomicBoolean _messageReceived = new AtomicBoolean();
-
+
+ private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
+
//----- Constructors
/**
* Create a new MessageProducerImpl.
@@ -120,11 +126,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
// let's create a message part assembler
- /**
- * A Qpid message listener that pushes messages to this consumer session when this consumer is
- * asynchronous or directly to this consumer when it is synchronously accessed.
- */
- MessagePartListener messageAssembler = new MessagePartListenerAdapter(new QpidMessageListener(this));
+
+ MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
if (destination instanceof Queue)
{
@@ -183,10 +186,8 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- getSession().getQpidSession().sync();
+ requestCredit(1);
+ requestSync();
// check for an exception
if (getSession().getCurrentException() != null)
{
@@ -266,9 +267,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().getQpidSession().messageStop(getMessageActorID());
}
_messageAsyncrhonouslyReceived = 0;
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_MESSAGE_TRANSFERRED);
+ requestCredit(MAX_MESSAGE_TRANSFERRED);
}
/**
@@ -281,7 +280,28 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receive() throws JMSException
{
- return receive(0);
+ // Check if we can get a message immediately
+ Message result;
+ result = receiveNoWait();
+
+ if(result != null)
+ {
+ return result;
+ }
+
+ try
+ {
+ // Now issue a credit and wait for the broker to send a message
+ // IMO no point doing a credit() flush() and sync() in a loop.
+ // This will only overload the broker. After the initial try we can wait
+ // for the broker to send a message when it gets one
+ requestCredit(1);
+ return (Message)_queue.take();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
/**
@@ -297,20 +317,35 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receive(long timeout) throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
if (timeout < 0)
{
throw new JMSException("Invalid timeout value: " + timeout);
}
+
Message result;
try
{
- result = internalReceive(timeout);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ // We shouldn't do a sync(). Bcos the timeout can happen
+ // before the sync() returns
+ return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
/**
@@ -321,138 +356,56 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
*/
public Message receiveNoWait() throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
Message result;
try
{
- result = internalReceive(-1);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ requestSync();
+ return (Message)_queue.poll();
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
-
- // not public methods
-
- /**
- * Receive a synchronous message
- * <p> This call blocks until a message arrives, the timeout expires, or this message consumer
- * is closed.
- * <p> A timeout of zero never expires, and the call blocks indefinitely (unless this message consumer
- * is closed)
- * <p> A timeout less than 0 returns the next message or null if one is not available.
- *
- * @param timeout The timeout value (in milliseconds)
- * @return the next message or null if one is not available.
- * @throws Exception If receiving the next message fails due to some internal error.
- */
- private Message internalReceive(long timeout) throws Exception
+
+ // not public methods
+ private void requestCredit(int units)
{
- checkNotClosed();
- Message result = null;
-
- if (_messageListener != null)
- {
- throw new javax.jms.IllegalStateException("A listener has already been set.");
- }
-
- synchronized (_incomingMessageLock)
- {
- // This indicate to the delivery thread to deliver the message to this consumer
- // as it can happens that a message is delivered after a receive operation as returned.
- _isReceiving = true;
- boolean blockingReceived = timeout == 0;
- if (!_isStopped)
- {
- // if this consumer is stopped then this will be call when starting
- requestOneMessage();
- //When sync() returns we know whether we have received a message or not.
- System.out.println("Internal receive -- Called sync()");
- getSession().getQpidSession().sync();
- System.out.println("Internal receive -- Returned from sync()");
- }
- if (_messageReceived.get() && timeout < 0)
- {
- // this is a nowait and we havent received a message then we must immediatly return
- result = null;
- }
- else
- {
- boolean messageReceived = false;
- while (!messageReceived)
- {
- long timeBeforeWait = 0;
- while (_incomingMessage == null && !_isClosed)
- {
- if (!blockingReceived)
- {
- timeBeforeWait = System.currentTimeMillis();
- }
- try
- {
- _incomingMessageLock.wait(timeout);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- if (_incomingMessage != null)
- {
- result = (Message) _incomingMessage;
- // tell the session that a message is inprocess
- getSession().preProcessMessage(_incomingMessage);
- // tell the session to acknowledge this message (if required)
- getSession().acknowledgeMessage(_incomingMessage);
- _incomingMessage.afterMessageReceive();
- messageReceived = true;
- }
- else
- {
- //now setup the new timeout
- if (!blockingReceived)
- {
- timeout = timeout - (System.currentTimeMillis() - timeBeforeWait);
- }
- if (!_isClosed)
- {
- // we need to request a new message
- requestOneMessage();
- getSession().getQpidSession().sync();
- if (_messageReceived.get() && timeout < 0)
- {
- // we are waiting for too long and we haven't received a proper message
- result = null;
- messageReceived = true;
- }
- }
- }
- }
- _incomingMessage = null;
- }
- // We now release any message received for this consumer
- _isReceiving = false;
- getSession().testQpidException();
- }
- return result;
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
}
-
- /**
- * Request a single message
- */
- private void requestOneMessage()
+
+ private void requestFlush()
+ {
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+
+ private void requestSync()
+ {
+ getSession().getQpidSession().sync();
+ }
+
+ private void checkClosed() throws JMSException
{
- if (_logger.isDebugEnabled())
+ if(_isStopped)
{
- _logger.debug("Requesting a single message");
+ throw new JMSException("Session is closed");
}
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
}
-
+
/**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message listener
@@ -475,93 +428,42 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
synchronized (_incomingMessageLock)
{
- _isStopped = false;
- if (_isReceiving)
- {
- // there is a synch call waiting for a message to be delivered
- // so tell the broker to deliver a message
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- }
+ _isStopped = false;
}
}
- /**
- * Deliver a message to this consumer.
- *
- * @param message The message delivered to this consumer.
- */
- protected synchronized void onMessage(QpidMessage message)
- {
+ public void onMessage(org.apache.qpidity.api.Message message)
+ {
try
{
- // if there is a message selector then we need to evaluate it.
- boolean messageOk = true;
- if (_messageSelector != null)
- {
- messageOk = _filter.matches((Message) message);
- }
-
- System.out.println("Received a message- onMessage in message consumer Impl");
- if (!messageOk && _preAcquire)
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ if (_messageListener == null)
{
- // this is the case for topics
- // We need to ack this message
- System.out.println("onMessage - trying to ack message");
- acknowledgeMessage(message);
- System.out.println("onMessage - acked message");
+ _queue.offer(jmsMessage);
}
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk)
+ else
{
- System.out.println("onMessage - trying to acquire message");
- messageOk = acquireMessage(message);
- System.out.println("onMessage - acquired message");
+ // I still think we don't need that additional thread in SessionImpl
+ // if the Application blocks on a message thats fine
+ // getSession().dispatchMessage(getMessageActorID(), jmsMessage);
+ notifyMessageListener(jmsMessage);
}
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+
+ public void notifyMessageListener(QpidMessage message)throws RuntimeException
+ {
+ try
+ {
+ boolean messageOk = checkPreConditions(message);
- // if this consumer is synchronous then set the current message and
- // notify the waiting thread
- if (_messageListener == null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Received a message- onMessage in message consumer Impl");
- }
- synchronized (_incomingMessageLock)
- {
- System.out.println("got incomming message lock");
- if (messageOk)
- {
- // we have received a proper message that we can deliver
- if (_isReceiving)
- {
- System.out.println("Is receiving true, setting message and notifying");
- _incomingMessage = message;
- _incomingMessageLock.notify();
- }
- else
- {
- // this message has been received after a received as returned
- // we need to release it
- releaseMessage(message);
- }
- }
- else
- {
- // oups the message did not match the selector or we did not manage to acquire it
- // If the receiver is still waiting for a message
- // then we need to request a new one from the server
- if (_isReceiving)
- {
- _incomingMessageLock.notify();
- }
- }
- }
- }
- else
+ // only deliver the message if it is valid
+ if (messageOk)
{
_messageAsyncrhonouslyReceived++;
if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
@@ -569,56 +471,93 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
resetAsynchMessageReceived();
}
- // only deliver the message if it is valid
- if (messageOk)
+
+ preApplicationProcessing(message);
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS provider will
+ * redeliver the same message before giving up is provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
+ * --- Transacted Session - the next message for the listener is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+
+ _messageListener.onMessage((Message) message);
+ }
+ catch (RuntimeException re)
{
- // This is an asynchronous message
- // tell the session that a message is in process
- getSession().preProcessMessage(message);
- // If the session is transacted we need to ack the message first
- // This is because a message is associated with its tx only when acked
- if (getSession().getTransacted())
- {
- getSession().acknowledgeMessage(message);
- }
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a JMS provider will
- * redeliver the same message before giving up is provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the listener is delivered.
- * --- Transacted Session - the next message for the listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
- message.afterMessageReceive();
- _messageListener.onMessage((Message) message);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
- // If the session has been recovered we then need to redelivered this message
- if (getSession().isInRecovery())
- {
- releaseMessage(message);
- }
- else if (!getSession().getTransacted())
- {
- // Tell the jms Session to ack this message if required
- getSession().acknowledgeMessage(message);
- }
+ // do nothing as this message will not be redelivered
}
}
+
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
+
+ private void checkIfListenerSet() throws javax.jms.IllegalStateException
+ {
+
+ if (_messageListener != null)
+ {
+ throw new javax.jms.IllegalStateException("A listener has already been set.");
+ }
+ }
+
+ private void preApplicationProcessing(QpidMessage message)throws Exception
+ {
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ message.afterMessageReceive();
+ }
+
+ private boolean checkPreConditions(QpidMessage message)throws QpidException
+ {
+ boolean messageOk = true;
+ if (_messageSelector != null)
+ {
+ messageOk = _filter.matches((Message) message);
+ if (!messageOk)
+ {
+ System.out.println("Message not OK, releasing");
+ releaseMessage(message);
+ }
+ }
+
+ System.out.println("messageOk " + messageOk);
+ System.out.println("_preAcquire " + _preAcquire);
+
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ System.out.println("filterMessage - trying to ack message");
+ acknowledgeMessage(message);
+ System.out.println("filterMessage - acked message");
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ System.out.println("filterMessage - trying to acquire message");
+ messageOk = acquireMessage(message);
+ System.out.println("filterMessage - acquired message");
+ }
+
+ return messageOk;
+ }
/**
* Release a message
@@ -681,9 +620,4 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
getSession().testQpidException();
}
}
-
- public void notifyMessageReceived()
- {
- _messageReceived.set(true);
- }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java b/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
deleted file mode 100644
index 4dbf86a388..0000000000
--- a/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpidity.jms;
-
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.jms.message.MessageFactory;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.util.MessageListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p> When asynchronous, upon receive of a message this listener delegate the dispatching to its session.
- * This is for guarantying that asynch messages are sequentially processed within their session.
- * <p> when used synchonously, messages are dispatched to the receiver itself.
- */
-public class QpidMessageListener implements MessageListener
-{
- /**
- * Used for debugging.
- */
- private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
-
- /**
- * This message listener consumer
- */
- MessageConsumerImpl _consumer = null;
-
- //---- constructor
- /**
- * Create a message listener wrapper for a given consumer
- *
- * @param consumer The consumer of this listener
- */
- public QpidMessageListener(MessageConsumerImpl consumer)
- {
- _consumer = consumer;
- }
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- try
- {
- // to be used with flush
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its session.
- if( _consumer.getMessageListener() != null )
- {
- _consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
index f08fdba373..010a43ab41 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
@@ -1301,7 +1301,8 @@ public class SessionImpl implements Session
{
try
{
- mc.onMessage(message.getMessage());
+ // mc.onMessage(message.getMessage());
+ mc.notifyMessageListener(message.getMessage());
}
catch (RuntimeException t)
{