diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-06-19 09:01:59 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-06-19 09:01:59 +0000 |
| commit | a16002f9be0a06da956eb548d70a3fcd1adeab89 (patch) | |
| tree | 084111ed34934906c5bb94e5e796e0bfd4219058 /qpid/java/client | |
| parent | ecd6b1be18c7b65bd09f65a1ba4b9d80366146b0 (diff) | |
| download | qpid-python-a16002f9be0a06da956eb548d70a3fcd1adeab89.tar.gz | |
QPID-950 : Broker refactoring, copied / merged from branch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@669431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
4 files changed, 79 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3b51fb8db0..75e8cbe155 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1014,6 +1014,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } } + /** * Declares the named queue. * @@ -1031,18 +1032,40 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException { + createQueue(name, autoDelete, durable, exclusive, null); + } + + + /** + * Declares the named queue. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param name The name of the queue to declare. + * @param autoDelete + * @param durable Flag to indicate that the queue is durable. + * @param exclusive Flag to indicate that the queue is exclusive to this client. + * @param arguments Arguments used to set special properties of the queue + * + * @throws AMQException If the queue cannot be declared for any reason. + * + * @todo Be aware of possible changes to parameter order as versions change. + */ + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive, final Map<String, Object> arguments) throws AMQException + { new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - sendCreateQueue(name, autoDelete, durable, exclusive); + sendCreateQueue(name, autoDelete, durable, exclusive, arguments); return null; } }, _connection).execute(); } public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive)throws AMQException, FailoverException; + final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException; /** * Creates a QueueReceiver * diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 4c3d768020..4f9c4edc75 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -243,13 +243,14 @@ public class AMQSession_0_10 extends AMQSession * @param durable If set when creating a new queue, * the queue will be marked as durable. * @param exclusive Exclusive queues can only be used from one connection at a time. + * @param arguments Exclusive queues can only be used from one connection at a time. * @throws AMQException * @throws FailoverException */ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive) throws AMQException, FailoverException + final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException { - getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION, + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION, autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, exclusive ? Option.EXCLUSIVE : Option.NO_OPTION); // We need to sync so that we get notify of an error. @@ -594,7 +595,7 @@ public class AMQSession_0_10 extends AMQSession try { // this is done so that we can produce to a temporary queue beofre we create a consumer - sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive()); + sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null); sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result); result.setQueueName(result.getRoutingKey()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 00aa8e4d31..e0e319250e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -40,6 +40,8 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + public final class AMQSession_0_8 extends AMQSession { @@ -125,10 +127,19 @@ public final class AMQSession_0_8 extends AMQSession handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class); } - public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException, + public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException, FailoverException { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null); + FieldTable table = null; + if(arguments != null && !arguments.isEmpty()) + { + table = new FieldTable(); + for(Map.Entry<String, Object> entry : arguments.entrySet()) + { + table.setObject(entry.getKey(), entry.getValue()); + } + } + QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); AMQFrame queueDeclare = body.generateFrame(_channelId); getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); } @@ -412,6 +423,28 @@ public final class AMQSession_0_8 extends AMQSession return subscriber; } + + + + public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException + { + new FailoverRetrySupport<Object, AMQException>( + new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + + return null; + } + }, _connection).execute(); + } + class QueueDeclareOkHandler extends SpecificMethodFrameListener { diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java index 136b9b94b6..85ee78bd17 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java @@ -206,16 +206,27 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase MessageConsumer consumer2 = _clientSession1.createConsumer(_queue); - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + int msg; + for (msg = 0; msg < (MSG_COUNT / 2); msg++) { - assertTrue(_consumer1.receive(3000) != null); + + final Message message = _consumer1.receive(1000); + if(message == null) + { + break; + } + } - for (int msg = 0; msg < (MSG_COUNT / 2); msg++) + _consumer1.close(); + _clientSession1.close(); + + for (; msg < MSG_COUNT ; msg++) { - assertTrue(consumer2.receive(3000) != null); + assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null); } + } else { |
