summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-14 12:57:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-14 12:57:42 +0000
commit21d5aa6534515bc7c1f343b5a4b579fe9513b0ce (patch)
treef6f6dcdac4d93ebc28f53fa016b2754a71d62982 /java/client
parent92a03fa08a503afb6af1988862393a86813d8482 (diff)
downloadqpid-python-21d5aa6534515bc7c1f343b5a4b579fe9513b0ce.tar.gz
QPID-854 : Changes to the client to make the dispatcher responsible for closing the queue browser when all the messages have been processed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637086 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java51
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java157
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java174
5 files changed, 297 insertions, 135 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 4171e9bf9b..a3cf39003d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -50,7 +51,9 @@ public class AMQQueueBrowser implements QueueBrowser
_messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
// Create Consumer to verify message selector.
BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ // Close this consumer as we are not looking to consume only to establish that, at least for now,
+ // the QB can be created
consumer.close();
}
@@ -88,40 +91,40 @@ public class AMQQueueBrowser implements QueueBrowser
checkState();
final BasicMessageConsumer consumer =
(BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
- consumer.closeWhenNoMessages(true);
+
_consumers.add(consumer);
return new Enumeration()
+ {
+
+ Message _nextMessage = consumer == null ? null : consumer.receive();
+
+ public boolean hasMoreElements()
{
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
- Message _nextMessage = consumer.receive();
+ return (_nextMessage != null);
+ }
- public boolean hasMoreElements()
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
{
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ _logger.info("QB:nextElement about to receive");
- return (_nextMessage != null);
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
}
-
- public Object nextElement()
+ catch (JMSException e)
{
- Message msg = _nextMessage;
- try
- {
- _logger.info("QB:nextElement about to receive");
-
- _nextMessage = consumer.receive();
- _logger.info("QB:nextElement received:" + _nextMessage);
- }
- catch (JMSException e)
- {
- _logger.warn("Exception caught while queue browsing", e);
- _nextMessage = null;
- }
-
- return msg;
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
}
- };
+
+ return msg;
+ }
+ };
}
public void close() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9f7f53a011..0a51ec7c47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -648,6 +648,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
+ // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+ // We need to determin here if the connection should be
+
synchronized (_connection.getFailoverMutex())
{
if (e instanceof AMQDisconnectedException)
@@ -763,13 +770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
- // fixme this isn't right.. needs to check if _queue contains data for this consumer
- if (consumer.isAutoClose()) // && _queue.isEmpty())
- {
- consumer.closeWhenNoMessages(true);
- }
-
- if (!consumer.isNoConsume())
+ if (!consumer.isNoConsume()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -785,7 +786,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_dispatcher.rejectPending(consumer);
}
- else
+ else // Queue Browser
{
// Just close the consumer
// fixme the CancelOK is being processed before the arriving messages..
@@ -793,13 +794,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// has yet to receive before the close comes in.
// consumer.markClosed();
+
+
+
+ if (consumer.isAutoClose())
+ { // There is a small window where the message is between the two queues in the dispatcher.
+ if (consumer.isClosed())
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + consumer.debugIdentity());
+ }
+
+ deregisterConsumer(consumer);
+
+ }
+ else
+ {
+ _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ }
+ }
}
}
- else
- {
- _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
- }
-
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -2934,7 +2950,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait(2000);
}
- if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())
+ if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()))
{
rejectMessage(message, true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 605e2d1e83..efbce6033b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -38,6 +38,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -121,7 +122,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* on the queue. This is used for queue browsing.
*/
private final boolean _autoClose;
- private boolean _closeWhenNoMessages;
private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
@@ -358,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted acquire: " + e);
if (isClosed())
{
return null;
@@ -369,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = null;
if (l > 0)
{
@@ -386,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted poll: " + e);
if (isClosed())
{
return null;
@@ -404,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted take: " + e);
if (isClosed())
{
return null;
@@ -426,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private boolean closeOnAutoClose() throws JMSException
- {
- if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
- {
- close(false);
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -468,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -513,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -526,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- // synchronized (_closed)
-
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
}
- synchronized (_connection.getFailoverMutex())
+ if (!_closed.getAndSet(true))
{
- if (!_closed.getAndSet(true))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+ }
+ }
- if (sendClose)
+ if (sendClose)
+ {
+ // The Synchronized block only needs to protect network traffic.
+ synchronized (_connection.getFailoverMutex())
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
@@ -564,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
-
}
catch (AMQException e)
{
@@ -575,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
- else
- {
- // //fixme this probably is not right
- // if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ }
+ else
+ {
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
}
+ }
- if ((_messageListener != null) && _receiving.get())
+ // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+ // so we need to let it know it is time to close.
+ if ((_messageListener != null) && _receiving.get())
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Interrupting thread: " + _receivingThread);
- }
-
- _receivingThread.interrupt();
+ _logger.info("Interrupting thread: " + _receivingThread);
}
+
+ _receivingThread.interrupt();
}
}
}
@@ -634,6 +616,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
@@ -646,12 +634,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(deliverBody.getDeliveryTag(),
- deliverBody.getRedelivered(),
- deliverBody.getExchange(),
- deliverBody.getRoutingKey(),
- messageFrame.getContentHeader(),
- messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -688,9 +676,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * @param jmsMessage this message has already been processed so can't redo preDeliver
- */
+ /** @param closeMessage this message signals that we should close the browser */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close this consumer.
+ // Though an AutoClose consumer with message listener is quite odd...
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," +
+ "but we shouldn't have close yet");
+ }
+ }
+ }
+
+ /** @param jmsMessage this message has already been processed so can't redo preDeliver */
public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
@@ -913,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _noConsume;
}
- public void closeWhenNoMessages(boolean b)
- {
- _closeWhenNoMessages = b;
-
- if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
- {
- _closed.set(true);
- _receivingThread.interrupt();
- }
-
- }
-
public void rollback()
{
clearUnackedMessages();
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 8c8814e9b7..a580a6466d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -104,6 +104,11 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
}
// fixme why is this only done when the close is expected...
// should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+
session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index bc1ba155cb..18157adc34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,10 +24,20 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -128,31 +138,159 @@ public abstract class UnprocessedMessage
}
public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
{
- private final BasicReturnBody _body;
+ _body = body;
+ }
- public UnprocessedBouncedMessage(final BasicReturnBody body)
- {
- _body = body;
- }
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
- public BasicDeliverBody getDeliverBody()
- {
- return null;
- }
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
- public BasicReturnBody getBounceBody()
- {
- return _body;
- }
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ BasicMessageConsumer _consumer;
- public boolean isDeliverMessage()
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ _consumer = consumer;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
{
- return false;
- }
+ // This is the only thing we need to preserve so the correct consumer can be found later.
+ public AMQShortString getConsumerTag()
+ {
+ return _consumer.getConsumerTag();
+ }
+
+ // The Rest of these methods are not used
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 0;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException
+ {
+ }
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+ {
+ return false;
+ }
+ };
}
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
}