summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-03-13 10:35:42 +0000
commitc4f018f7c10d2169ced4c59e776844ee5cf52d33 (patch)
tree421d85cf41bf382bab618587298d9d6bef825bdc /java/client/src
parent685ad5615e73f02f76f69841162fb9aa126892d2 (diff)
downloadqpid-python-c4f018f7c10d2169ced4c59e776844ee5cf52d33.tar.gz
QPID-346,QPID-386,QPID-403, QPID-410 Rollback, Basic-Reject, QueueBrowser NO_ACK.
QPID-346 Message loss after rollback\recover QPID-386 Updated Transactional Tests to cover underlying AMQP/Qpid state. QPID-403 Implement Basic.Reject QPID-410 Queue Browsers should use not acknowledge messages. ------------------------------------- Broker TxAck - Added comment and fixed white space UnacknowledgedMessage - Added comment for messageDecrement AMQChannel - Added extra debugging. + Created a NonTransactionalContext for requeuing messages as using txContext will tie the requeue to any runing transaction. + Updated message reference counting. So it is in terms of queues don't increment when giving to client. BasicCancelMethodHandler - Added Debug log. BasicConsumeMethodHandler - Reverted to directly writes frames to the session, throwing ChannelException caused problems. Added Trace and debug logging. BasicRejectMethodHandler, ChannelCloseHandler, ConnectionCloseMethodHandler - Added Debug logging AMQPFastProtocolHandler - moved error log to before session.write AMQMessage - Added additional debug via debugIdentity() and comments AMQQueue - Decoupled reference counting from dequeue operation. ConcurrentSelectorDeliveryManager - Added comments and increased info in debug logging SubscriptionImpl - Disabled use of acks for browsers. For now put setDeliveredToConsumer back in the finally block. commented that I'm not sure this is correct as even an error writing to client will cause msg to be marked delivered to consumer. + On Close ensured that it is only called once. + Had problem where closing browser was causing two CancelOk frames to be sent back to client. RequiredDeliveryException - Added comment to explain incrementReference LocalTransactionalContext - Commented out incrementReference as it shouldn't be required here. NonTransactionalContext - Removed incrementReference on deliver + - Fixed bug where browsers - acks would cause messages to be discarded. new JIRA this needs tidied up. TxnBuffer - Added debug logging. Client ------ AMQQueueBrowser - Added comments AMQSession - Added comments and debug + Updated to cause closed consumer to reject messages rather than receive them. + Prevented NoConsumer's from rollingback and rejecting.. they simply clear their SyncQueue - JIRA to ensure clean state with rollback BasicMessageConsumer - Added trace level debuging on close calls + Forced noConsume-rs to use NO_ACK + added more logging Closeable - Updated to use isClosed rather than directly calling _closed.get() to aid in future work on ensuring multi threaded close still allows pending acks to be processed first. ChannelCloseOkMethodHandler - updated comment AMQProtocolSession - Update comments,whitespace TransportConnection - removed static block FlowControllingBlockingQueue - Added isEmpty() Method PropertyValueTest - Added VM Broker setup + Updated test to run once and 50 times to pull out delivery tag problems that were occuring. + Adjusted logging level to be more helpful. moved some info down to trace and debug. MessageRequeueTest - Moved QpidClientConnection its own file. + Fixed it so it actually runs more than one consumer, concurrently.Now 3 was 1. ConcurrentLinkedMessageQueueAtomicSize - Implemented iterator(). Added QueueBrowserTest to system tests to test QueueBrowsering. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@517638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java1
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java106
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java130
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/Closeable.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java33
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java4
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java71
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java314
-rw-r--r--java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java268
11 files changed, 604 insertions, 400 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 053d380129..4662f80c5b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -49,6 +49,7 @@ public class AMQQueueBrowser implements QueueBrowser
_session = session;
_queue = queue;
_messageSelector = (messageSelector == null) || (messageSelector.trim().length() == 0) ? null : messageSelector;
+ // Create Consumer to verify message selector.
BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
consumer.close();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 61143eee69..184bc44912 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -25,6 +25,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -294,8 +295,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
- ": Currently " + (currently ? "Started" : "Stopped"));
+ _dispatcherLogger.debug("Set Dispatcher Connection " + (connectionStopped ? "Stopped" : "Started") +
+ ": Currently " + (currently ? "Stopped" : "Started"));
}
}
return currently;
@@ -307,22 +308,31 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
- if (consumer == null)
+ if (consumer == null || consumer.isClosed())
{
if (_dispatcherLogger.isInfoEnabled())
{
- _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
- "[" + message.getDeliverBody().deliveryTag + "] from queue "
- + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+ if (consumer == null)
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+ message.getDeliverBody().consumerTag +
+ " )without a handler - rejecting(requeue)...");
+ }
+ else
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue " +
+ " consumer(" + consumer.debugIdentity() +
+ ") is closed rejecting(requeue)...");
+ }
}
rejectMessage(message, true);
}
else
{
-
consumer.notifyMessage(message, _channelId);
-
}
}
}
@@ -354,7 +364,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
for (BasicMessageConsumer consumer : _consumers.values())
{
- consumer.rollback();
+ if (!consumer.isNoConsume())
+ {
+ consumer.rollback();
+ }
+ else
+ {
+ // should perhaps clear the _SQ here.
+ //consumer._synchronousQueue.clear();
+ consumer.clearReceiveQueue();
+ }
+
+
}
setConnectionStopped(isStopped);
@@ -379,8 +400,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- // Remove consumer from map.
- deregisterConsumer(consumer);
+ // closeConsumer
+ consumer.markClosed();
_dispatcher.setConnectionStopped(stopped);
@@ -624,6 +645,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void close(long timeout) throws JMSException
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing session: " + this + ":" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ }
+
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
synchronized (_connection.getFailoverMutex())
@@ -2063,26 +2089,39 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Remove the consumer from the map
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if (consumer != null)
- {
- if (consumer.isAutoClose())
+ {
+// fixme this isn't right.. needs to check if _queue contains data for this consumer
+ if (consumer.isAutoClose())// && _queue.isEmpty())
{
consumer.closeWhenNoMessages(true);
}
- //Clean the Maps up first
- //Flush any pending messages for this consumerTag
- if (_dispatcher != null)
+ if (!consumer.isNoConsume())
{
- _logger.info("Dispatcher is not null");
+ //Clean the Maps up first
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _logger.info("Dispatcher is not null");
+ }
+ else
+ {
+ _logger.info("Dispatcher is null so created stopped dispatcher");
+
+ startDistpatcherIfNecessary(true);
+ }
+
+ _dispatcher.rejectPending(consumer);
}
else
{
- _logger.info("Dispatcher is null so created stopped dispatcher");
+ //Just close the consumer
+ //fixme the CancelOK is being processed before the arriving messages..
+ // The dispatcher is still to process them so the server sent in order but the client
+ // has yet to receive before the close comes in.
- startDistpatcherIfNecessary(true);
+// consumer.markClosed();
}
-
- _dispatcher.rejectPending(consumer);
}
else
{
@@ -2217,7 +2256,20 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
{
Iterator messages = _queue.iterator();
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag +
+ ") (PDispatchQ) requeue:" + requeue);
+ if (messages.hasNext())
+ {
+ _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+ }
+ else
+ {
+ _logger.info("No messages in _queue to reject");
+ }
+ }
while (messages.hasNext())
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
@@ -2239,10 +2291,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
}
}
- else
- {
- _logger.error("Pruned pending message for consumer:" + consumerTag);
- }
}
}
@@ -2250,9 +2298,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- if (_logger.isDebugEnabled())
+ if (_logger.isTraceEnabled())
{
- _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ _logger.trace("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
}
rejectMessage(message.getDeliverBody().deliveryTag, requeue);
@@ -2260,9 +2308,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
{
- if (_logger.isDebugEnabled())
+ if (_logger.isTraceEnabled())
{
- _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+ _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
}
rejectMessage(message.getDeliveryTag(), requeue);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 9043faa80c..73010ce517 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.util.Iterator;
import java.util.List;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -118,7 +119,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
/**
@@ -135,6 +136,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private boolean _closeWhenNoMessages;
private boolean _noConsume;
+ private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
@@ -157,6 +159,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
+
+ //Force queue browsers not to use acknowledge modes.
+ if (_noConsume)
+ {
+ _acknowledgeMode = Session.NO_ACKNOWLEDGE;
+ }
}
public AMQDestination getDestination()
@@ -433,6 +441,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
+ //synchronized (_closed)
+
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
@@ -442,6 +452,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
if (!_closed.getAndSet(true))
{
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+ }
+ }
if (sendClose)
{
// TODO: Be aware of possible changes to parameter order as versions change.
@@ -467,9 +489,15 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSException("Error closing consumer: " + e);
}
}
+ else
+ {
+// //fixme this probably is not right
+// if (!isNoConsume())
+ { //done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
+ }
+ }
- //done in BasicCancelOK Handler
- //deregisterConsumer();
if (_messageListener != null && _receiving.get())
{
if (_logger.isInfoEnabled())
@@ -488,7 +516,23 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void markClosed()
{
- _closed.set(true);
+// synchronized (_closed)
+ {
+ _closed.set(true);
+
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " markClosed():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ }
+ }
+ }
deregisterConsumer();
}
@@ -520,11 +564,24 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
}
- jmsMessage.setConsumer(this);
+// synchronized (_closed)
+ {
+// if (!_closed.get())
+ {
+
+ jmsMessage.setConsumer(this);
- preDeliver(jmsMessage);
+ preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage, channelId);
+ }
+// else
+// {
+// _logger.error("MESSAGE REJECTING!");
+// _session.rejectMessage(jmsMessage, true);
+// //_logger.error("MESSAGE JUST DROPPED!");
+// }
+ }
}
catch (Exception e)
{
@@ -551,9 +608,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
//we do not need a lock around the test above, and the dispatch below as it is invalid
//for an application to alter an installed listener while the session is started
- preApplicationProcessing(jmsMessage);
- getMessageListener().onMessage(jmsMessage);
- postDeliver(jmsMessage);
+// synchronized (_closed)
+ {
+// if (!_closed.get())
+ {
+
+ preApplicationProcessing(jmsMessage);
+ getMessageListener().onMessage(jmsMessage);
+ postDeliver(jmsMessage);
+ }
+ }
}
else
{
@@ -649,14 +713,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
lastDeliveryTag = _receivedDeliveryTags.poll();
}
+ assert _receivedDeliveryTags.isEmpty();
+
_session.acknowledgeMessage(lastDeliveryTag, true);
}
}
void notifyError(Throwable cause)
{
- _closed.set(true);
-
+// synchronized (_closed)
+ {
+ _closed.set(true);
+ if (_logger.isTraceEnabled())
+ {
+ if (_closedStack != null)
+ {
+ _logger.trace(_consumerTag + " notifyError():" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+ }
+ else
+ {
+ _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ }
+ }
+ }
//QPID-293 can "request redelivery of this error through dispatcher"
// we have no way of propagating the exception to a message listener - a JMS limitation - so we
@@ -761,14 +841,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
clearUnackedMessages();
- if (_logger.isDebugEnabled())
+ if (!_receivedDeliveryTags.isEmpty())
{
- _logger.debug("Rejecting received messages");
+ _logger.debug("Rejecting received messages in _receivedDTs (RQ)");
}
//rollback received but not committed messages
while (!_receivedDeliveryTags.isEmpty())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)" +
+ "for consumer with tag:" + _consumerTag);
+ }
+
Long tag = _receivedDeliveryTags.poll();
if (tag != null)
@@ -782,12 +868,20 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Queue _receivedDTs (RQ) was not empty after rejection");
+ }
+ }
+
//rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)" +
"for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
@@ -821,7 +915,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
rollback();
}
- _synchronousQueue.clear();
+ clearReceiveQueue();
}
}
@@ -831,4 +925,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return String.valueOf(_consumerTag);
}
+ public void clearReceiveQueue()
+ {
+ _synchronousQueue.clear();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
index 6593f2c254..d246dc3931 100644
--- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java
+++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
@@ -25,20 +25,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
-/**
- * Provides support for orderly shutdown of an object.
- */
+/** Provides support for orderly shutdown of an object. */
public abstract class Closeable
{
/**
- * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing
- * access to this flag would mean have a synchronized block in every method.
+ * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this
+ * flag would mean have a synchronized block in every method.
*/
protected final AtomicBoolean _closed = new AtomicBoolean(false);
protected void checkNotClosed() throws JMSException
{
- if (_closed.get())
+ if (isClosed())
{
throw new IllegalStateException("Object " + toString() + " has been closed");
}
@@ -46,7 +44,10 @@ public abstract class Closeable
public boolean isClosed()
{
- return _closed.get();
+// synchronized (_closed)
+ {
+ return _closed.get();
+ }
}
public abstract void close() throws JMSException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
index 794071cc34..0826deb2f4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
@@ -42,6 +42,6 @@ public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
{
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
- //todo this should do the closure
+ //todo this should do the local closure
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 19767b6575..e875b4dca8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -51,10 +51,8 @@ import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.AMQConstant;
/**
- * Wrapper for protocol session that provides type-safe access to session attributes.
- * <p/>
- * The underlying protocol session is still available but clients should not
- * use it to obtain session attributes.
+ * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
+ * session is still available but clients should not use it to obtain session attributes.
*/
public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
@@ -78,27 +76,23 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected WriteFuture _lastWriteFuture;
/**
- * The handler from which this session was created and which is used to handle protocol events.
- * We send failover events to the handler.
+ * The handler from which this session was created and which is used to handle protocol events. We send failover
+ * events to the handler.
*/
protected final AMQProtocolHandler _protocolHandler;
- /**
- * Maps from the channel id to the AMQSession that it represents.
- */
+ /** Maps from the channel id to the AMQSession that it represents. */
protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
/**
- * Maps from a channel id to an unprocessed message. This is used to tie together the
- * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
+ * first) with the subsequent content header and content bodies.
*/
protected ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
- /**
- * Counter to ensure unique queue names
- */
+ /** Counter to ensure unique queue names */
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
@@ -108,8 +102,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
/**
- * No-arg constructor for use by test subclass - has to initialise final vars
- * NOT intended for use other then for test
+ * No-arg constructor for use by test subclass - has to initialise final vars NOT intended for use other then for
+ * test
*/
public AMQProtocolSession()
{
@@ -147,7 +141,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
+
_minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
}
@@ -207,8 +201,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
/**
* Store the SASL client currently being used for the authentication handshake
*
- * @param client if non-null, stores this in the session. if null clears any existing client
- * being stored
+ * @param client if non-null, stores this in the session. if null clears any existing client being stored
*/
public void setSaslClient(SaslClient client)
{
@@ -237,10 +230,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
- * This is invoked on the MINA dispatcher thread.
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received. This is invoked on the MINA
+ * dispatcher thread.
*
* @param message
+ *
* @throws AMQException if this was not expected
*/
public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
@@ -295,8 +289,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Deliver a message to the appropriate session, removing the unprocessed message
- * from our map
+ * Deliver a message to the appropriate session, removing the unprocessed message from our map
*
* @param channelId the channel id the message should be delivered to
* @param msg the message
@@ -309,8 +302,8 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Convenience method that writes a frame to the protocol session. Equivalent
- * to calling getProtocolSession().write().
+ * Convenience method that writes a frame to the protocol session. Equivalent to calling
+ * getProtocolSession().write().
*
* @param frame the frame to write
*/
@@ -377,15 +370,14 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
}
/**
- * Called from the ChannelClose handler when a channel close frame is received.
- * This method decides whether this is a response or an initiation. The latter
- * case causes the AMQSession to be closed and an exception to be thrown if
+ * Called from the ChannelClose handler when a channel close frame is received. This method decides whether this is
+ * a response or an initiation. The latter case causes the AMQSession to be closed and an exception to be thrown if
* appropriate.
*
* @param channelId the id of the channel (session)
- * @return true if the client must respond to the server, i.e. if the server
- * initiated the channel close, false if the channel close is just the server
- * responding to the client's earlier request to close the channel.
+ *
+ * @return true if the client must respond to the server, i.e. if the server initiated the channel close, false if
+ * the channel close is just the server responding to the client's earlier request to close the channel.
*/
public boolean channelClosed(int channelId, AMQConstant code, String text) throws AMQException
{
@@ -450,9 +442,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /**
- * @param delay delay in seconds (not ms)
- */
+ /** @param delay delay in seconds (not ms) */
void initHeartbeats(int delay)
{
if (delay > 0)
@@ -475,7 +465,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolMajorVersion = versionMajor;
_protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 69042d08ea..8368eee125 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -38,12 +38,10 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
/**
- * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up
- * the underlying connector, which currently always uses TCP/IP sockets. It creates the
- * "protocol handler" which deals with MINA protocol events.
- * <p/>
- * Could be extended in future to support different transport types by turning this into concrete class/interface
- * combo.
+ * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
+ * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
+ * protocol events. <p/> Could be extended in future to support different transport types by turning this into concrete
+ * class/interface combo.
*/
public class TransportConnection
{
@@ -61,22 +59,6 @@ public class TransportConnection
private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
- static
- {
- _acceptor = new VmPipeAcceptor();
-
- IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- public static ITransportConnection getInstance() throws AMQTransportConnectionException
- {
- AMQBrokerDetails details = new AMQBrokerDetails();
- details.setTransport(BrokerDetails.TCP);
- return getInstance(details);
- }
-
public static ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -182,7 +164,14 @@ public class TransportConnection
public static void createVMBroker(int port) throws AMQVMBrokerCreationException
{
+ if (_acceptor == null)
+ {
+ _acceptor = new VmPipeAcceptor();
+ IoServiceConfig config = _acceptor.getDefaultConfig();
+
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
+ }
if (!_inVmPipeAddress.containsKey(port))
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
index cb4ef01d25..642b928d81 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
@@ -44,6 +44,10 @@ public class FlowControllingBlockingQueue
/** We require a separate count so we can track whether we have reached the threshold */
private int _count;
+ public boolean isEmpty()
+ {
+ return _queue.isEmpty();
+ }
public interface ThresholdListener
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
index fe15fa5155..1e50a62fee 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
@@ -39,8 +39,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.testutil.VMBrokerSetup;
public class PropertyValueTest extends TestCase implements MessageListener
{
@@ -59,19 +59,13 @@ public class PropertyValueTest extends TestCase implements MessageListener
protected void setUp() throws Exception
{
super.setUp();
- try
- {
- init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
- }
- catch (Exception e)
- {
- fail("Unable to initialilse connection: " + e);
- }
+ TransportConnection.createVMBroker(1);
}
protected void tearDown() throws Exception
{
super.tearDown();
+ TransportConnection.killVMBroker(1);
}
private void init(AMQConnection connection) throws Exception
@@ -91,14 +85,48 @@ public class PropertyValueTest extends TestCase implements MessageListener
connection.start();
}
- public void test() throws Exception
+ public void testOnce()
{
- int count = _count;
- send(count);
- waitFor(count);
- check();
- _logger.info("Completed without failure");
- _connection.close();
+ runBatch(1);
+ }
+
+ public void test50()
+ {
+ runBatch(50);
+ }
+
+ private void runBatch(int runSize)
+ {
+ try
+ {
+ int run = 0;
+ while (run < runSize)
+ {
+ _logger.error("Run Number:" + run++);
+ try
+ {
+ init(new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "test"));
+ }
+ catch (Exception e)
+ {
+ fail("Unable to initialilse connection: " + e);
+ }
+
+ int count = _count;
+ send(count);
+ waitFor(count);
+ check();
+ _logger.info("Completed without failure");
+ _connection.close();
+
+ _logger.error("End Run Number:" + (run - 1));
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
}
void send(int count) throws JMSException
@@ -138,7 +166,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.info("Message:" + m);
+ _logger.trace("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly",
m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue"));
@@ -150,7 +178,7 @@ public class PropertyValueTest extends TestCase implements MessageListener
m.setShortProperty("Short", (short) Short.MAX_VALUE);
m.setStringProperty("String", "Test");
- _logger.info("Sending Msg:" + m);
+ _logger.debug("Sending Msg:" + m);
producer.send(m);
}
}
@@ -206,8 +234,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
Assert.assertEquals("Check String properties are correctly transported",
"Test", m.getStringProperty("String"));
}
+ received.clear();
assertEqual(messages.iterator(), actual.iterator());
+
+ messages.clear();
}
private static void assertEqual(Iterator expected, Iterator actual)
@@ -269,11 +300,11 @@ public class PropertyValueTest extends TestCase implements MessageListener
{
test._count = Integer.parseInt(argv[1]);
}
- test.test();
+ test.testOnce();
}
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(PropertyValueTest.class));
+ return new junit.framework.TestSuite(PropertyValueTest.class);
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
index a56bae3d70..7762cb3fe9 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
@@ -42,6 +42,7 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.testutil.QpidClientConnection;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
@@ -62,14 +63,14 @@ public class MessageRequeueTest extends TestCase
private boolean testReception = true;
private long[] receieved = new long[numTestMessages + 1];
- private boolean passed=false;
+ private boolean passed = false;
protected void setUp() throws Exception
{
super.setUp();
TransportConnection.createVMBroker(1);
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
@@ -85,21 +86,28 @@ public class MessageRequeueTest extends TestCase
{
super.tearDown();
- if (!passed)
+ if (!passed) // clean up
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
// clear queue
conn.consume(queue, consumeTimeout);
+
+ conn.disconnect();
}
TransportConnection.killVMBroker(1);
}
- /** multiple consumers */
+ /**
+ * multiple consumers
+ *
+ * @throws javax.jms.JMSException if a JMS problem occurs
+ * @throws InterruptedException on timeout
+ */
public void testDrain() throws JMSException, InterruptedException
{
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -170,6 +178,7 @@ public class MessageRequeueTest extends TestCase
assertEquals(list.toString(), 0, failed);
_logger.info("consumed: " + messagesReceived);
conn.disconnect();
+ passed = true;
}
/** multiple consumers */
@@ -186,8 +195,8 @@ public class MessageRequeueTest extends TestCase
Thread t4 = new Thread(c4);
t1.start();
-// t2.start();
-// t3.start();
+ t2.start();
+ t3.start();
// t4.start();
try
@@ -230,7 +239,7 @@ public class MessageRequeueTest extends TestCase
}
assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
- passed=true;
+ passed = true;
}
class Consumer implements Runnable
@@ -248,7 +257,7 @@ public class MessageRequeueTest extends TestCase
try
{
_logger.info("consumer-" + id + ": starting");
- QpidClientConnection conn = new QpidClientConnection();
+ QpidClientConnection conn = new QpidClientConnection(BROKER);
conn.connect();
@@ -318,286 +327,51 @@ public class MessageRequeueTest extends TestCase
}
- public class QpidClientConnection implements ExceptionListener
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
{
- private boolean transacted = true;
- private int ackMode = Session.CLIENT_ACKNOWLEDGE;
- private Connection connection;
-
- private String virtualHost;
- private String brokerlist;
- private int prefetch;
- protected Session session;
- protected boolean connected;
-
- public QpidClientConnection()
- {
- super();
- setVirtualHost("/test");
- setBrokerList(BROKER);
- setPrefetch(5000);
- }
-
-
- public void connect() throws JMSException
- {
- if (!connected)
- {
- /*
- * amqp://[user:pass@][clientid]/virtualhost?
- * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
- * [&failover='method[?option='value'[&option='value']]']
- * [&option='value']"
- */
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- try
- {
- AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
- _logger.info("connecting to Qpid :" + brokerUrl);
- connection = factory.createConnection();
-
- // register exception listener
- connection.setExceptionListener(this);
-
- session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
-
-
- _logger.info("starting connection");
- connection.start();
-
- connected = true;
- }
- catch (URLSyntaxException e)
- {
- throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
- }
- }
- }
-
- public void disconnect() throws JMSException
- {
- if (connected)
- {
- session.commit();
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected");
- }
- }
-
- public void disconnectWithoutCommit() throws JMSException
- {
- if (connected)
- {
- session.close();
- connection.close();
- connected = false;
- _logger.info("disconnected without commit");
- }
- }
-
- public String getBrokerList()
- {
- return brokerlist;
- }
-
- public void setBrokerList(String brokerlist)
- {
- this.brokerlist = brokerlist;
- }
-
- public String getVirtualHost()
+ int run = 0;
+ while (run < 10)
{
- return virtualHost;
- }
-
- public void setVirtualHost(String virtualHost)
- {
- this.virtualHost = virtualHost;
- }
-
- public void setPrefetch(int prefetch)
- {
- this.prefetch = prefetch;
- }
+ run++;
-
- /** override as necessary */
- public void onException(JMSException exception)
- {
- _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
- }
-
- public boolean isConnected()
- {
- return connected;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- /**
- * Put a String as a text messages, repeat n times. A null payload will result in a null message.
- *
- * @param queueName The queue name to put to
- * @param payload the content of the payload
- * @param copies the number of messages to put
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public void put(String queueName, String payload, int copies) throws JMSException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("putting to queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageProducer sender = session.createProducer(queue);
-
- for (int i = 0; i < copies; i++)
- {
- Message m = session.createTextMessage(payload + i);
- m.setIntProperty("index", i + 1);
- sender.send(m);
- }
-
- session.commit();
- sender.close();
- _logger.info("put " + copies + " copies");
- }
-
- /**
- * GET the top message on a queue. Consumes the message. Accepts timeout value.
- *
- * @param queueName The quename to get from
- * @param readTimeout The timeout to use
- *
- * @return the content of the text message if any
- *
- * @throws javax.jms.JMSException any exception that occured
- */
- public Message getNextMessage(String queueName, long readTimeout) throws JMSException
- {
- if (!connected)
+ if (_logger.isInfoEnabled())
{
- connect();
+ _logger.info("testRequeue run " + run);
}
- Queue queue = session.createQueue(queueName);
+ String virtualHost = "/test";
+ String brokerlist = BROKER;
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- final MessageConsumer consumer = session.createConsumer(queue);
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
- Message message = consumer.receive(readTimeout);
- session.commit();
- consumer.close();
-
- Message result;
+ _logger.debug("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
- // all messages we consume should be TextMessages
- if (message instanceof TextMessage)
- {
- result = ((TextMessage) message);
- }
- else if (null == message)
+ try
{
- result = null;
+ Thread.sleep(2000);
}
- else
+ catch (InterruptedException e)
{
- _logger.info("warning: received non-text message");
- result = message;
+ //
}
- return result;
- }
+ _logger.debug("Receiving msg");
+ Message msg = consumer.receive(1000);
- /**
- * GET the top message on a queue. Consumes the message.
- *
- * @param queueName The Queuename to get from
- *
- * @return The string content of the text message, if any received
- *
- * @throws javax.jms.JMSException any exception that occurs
- */
- public Message getNextMessage(String queueName) throws JMSException
- {
- return getNextMessage(queueName, 0);
- }
+ assertNotNull("Message should not be null", msg);
- /**
- * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
- *
- * @param queueName The Queue name to consume from
- * @param readTimeout The timeout for each consume
- *
- * @throws javax.jms.JMSException Any exception that occurs during the consume
- * @throws InterruptedException If the consume thread was interrupted during a consume.
- */
- public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
- {
- if (!connected)
- {
- connect();
- }
-
- _logger.info("consuming queue " + queueName);
- Queue queue = session.createQueue(queueName);
-
- final MessageConsumer consumer = session.createConsumer(queue);
- int messagesReceived = 0;
-
- _logger.info("consuming...");
- while ((consumer.receive(readTimeout)) != null)
- {
- messagesReceived++;
- }
- session.commit();
+ // As we have not ack'd message will be requeued.
+ _logger.debug("Close Consumer");
consumer.close();
- _logger.info("consumed: " + messagesReceived);
- }
- }
-
-
- public void testRequeue() throws JMSException, AMQException, URLSyntaxException
- {
- String virtualHost = "/test";
- String brokerlist = "vm://:1";
- String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
- Connection conn = new AMQConnection(brokerUrl);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue q = session.createQueue(queue);
-
- _logger.info("Create Consumer");
- MessageConsumer consumer = session.createConsumer(q);
-
- try
- {
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- //
+ _logger.debug("Close Connection");
+ conn.close();
}
-
- _logger.info("Receiving msg");
- Message msg = consumer.receive();
-
- assertNotNull("Message should not be null", msg);
-
- _logger.info("Close Consumer");
- consumer.close();
-
- _logger.info("Close Connection");
- conn.close();
}
} \ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
new file mode 100644
index 0000000000..f2afa472ab
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
@@ -0,0 +1,268 @@
+package org.apache.qpid.testutil;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.log4j.Logger;
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+
+public class QpidClientConnection implements ExceptionListener
+{
+
+ private static final Logger _logger = Logger.getLogger(QpidClientConnection.class);
+
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection(String broker)
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(broker);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+}
+