From a16002f9be0a06da956eb548d70a3fcd1adeab89 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 19 Jun 2008 09:01:59 +0000 Subject: QPID-950 : Broker refactoring, copied / merged from branch git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@669431 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 27 ++++++++++++++-- .../org/apache/qpid/client/AMQSession_0_10.java | 7 ++-- .../org/apache/qpid/client/AMQSession_0_8.java | 37 ++++++++++++++++++++-- .../client/MessageListenerMultiConsumerTest.java | 19 ++++++++--- 4 files changed, 79 insertions(+), 11 deletions(-) (limited to 'qpid/java/client') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3b51fb8db0..75e8cbe155 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1014,6 +1014,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + /** * Declares the named queue. * @@ -1030,19 +1031,41 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess */ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException + { + createQueue(name, autoDelete, durable, exclusive, null); + } + + + /** + * Declares the named queue. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * @param arguments Arguments used to set special properties of the queue + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive, final Map arguments) throws AMQException { new FailoverRetrySupport(new FailoverProtectedOperation() { public Object execute() throws AMQException, FailoverException { - sendCreateQueue(name, autoDelete, durable, exclusive); + sendCreateQueue(name, autoDelete, durable, exclusive, arguments); return null; } }, _connection).execute(); } public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive)throws AMQException, FailoverException; + final boolean exclusive, final Map arguments)throws AMQException, FailoverException; /** * Creates a QueueReceiver * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 4c3d768020..4f9c4edc75 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -243,13 +243,14 @@ public class AMQSession_0_10 extends AMQSession * @param durable If set when creating a new queue, * the queue will be marked as durable. * @param exclusive Exclusive queues can only be used from one connection at a time. + * @param arguments Exclusive queues can only be used from one connection at a time. * @throws AMQException * @throws FailoverException */ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException, FailoverException + final boolean exclusive, Map arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION, + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION, autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, exclusive ? Option.EXCLUSIVE : Option.NO_OPTION); // We need to sync so that we get notify of an error. @@ -594,7 +595,7 @@ public class AMQSession_0_10 extends AMQSession try { // this is done so that we can produce to a temporary queue beofre we create a consumer - sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); + sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null); sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 00aa8e4d31..e0e319250e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -40,6 +40,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + public final class AMQSession_0_8 extends AMQSession { @@ -125,10 +127,19 @@ public final class AMQSession_0_8 extends AMQSession handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); } - public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException, + public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map arguments) throws AMQException, FailoverException { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null); + FieldTable table = null; + if(arguments != null && !arguments.isEmpty()) + { + table = new FieldTable(); + for(Map.Entry entry : arguments.entrySet()) + { + table.setObject(entry.getKey(), entry.getValue()); + } + } + QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); AMQFrame queueDeclare = body.generateFrame(_channelId); getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); } @@ -412,6 +423,28 @@ public final class AMQSession_0_8 extends AMQSession return subscriber; } + + + + public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException + { + new FailoverRetrySupport( + new FailoverProtectedOperation() + { + public Object execute() throws AMQException, FailoverException + { + + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + + return null; + } + }, _connection).execute(); + } + class QueueDeclareOkHandler extends SpecificMethodFrameListener { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 136b9b94b6..85ee78bd17 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -206,16 +206,27 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase MessageConsumer consumer2 = _clientSession1.createConsumer(_queue); - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + int msg; + for (msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive(3000) != null); + + final Message message = _consumer1.receive(1000); + if(message == null) + { + break; + } + } - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + _consumer1.close(); + _clientSession1.close(); + + for (; msg < MSG_COUNT ; msg++) { - assertTrue(consumer2.receive(3000) != null); + assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null); } + } else { -- cgit v1.2.1