summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-06-19 09:01:59 +0000
commita16002f9be0a06da956eb548d70a3fcd1adeab89 (patch)
tree084111ed34934906c5bb94e5e796e0bfd4219058 /qpid/java/client
parentecd6b1be18c7b65bd09f65a1ba4b9d80366146b0 (diff)
downloadqpid-python-a16002f9be0a06da956eb548d70a3fcd1adeab89.tar.gz
QPID-950 : Broker refactoring, copied / merged from branch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@669431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java27
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java7
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java37
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java19
4 files changed, 79 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3b51fb8db0..75e8cbe155 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1014,6 +1014,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
}
+
/**
* Declares the named queue.
*
@@ -1031,18 +1032,40 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive) throws AMQException
{
+ createQueue(name, autoDelete, durable, exclusive, null);
+ }
+
+
+ /**
+ * Declares the named queue.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param name The name of the queue to declare.
+ * @param autoDelete
+ * @param durable Flag to indicate that the queue is durable.
+ * @param exclusive Flag to indicate that the queue is exclusive to this client.
+ * @param arguments Arguments used to set special properties of the queue
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ *
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive, final Map<String, Object> arguments) throws AMQException
+ {
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- sendCreateQueue(name, autoDelete, durable, exclusive);
+ sendCreateQueue(name, autoDelete, durable, exclusive, arguments);
return null;
}
}, _connection).execute();
}
public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive)throws AMQException, FailoverException;
+ final boolean exclusive, final Map<String, Object> arguments)throws AMQException, FailoverException;
/**
* Creates a QueueReceiver
*
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 4c3d768020..4f9c4edc75 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -243,13 +243,14 @@ public class AMQSession_0_10 extends AMQSession
* @param durable If set when creating a new queue,
* the queue will be marked as durable.
* @param exclusive Exclusive queues can only be used from one connection at a time.
+ * @param arguments Exclusive queues can only be used from one connection at a time.
* @throws AMQException
* @throws FailoverException
*/
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive) throws AMQException, FailoverException
+ final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
{
- getQpidSession().queueDeclare(name.toString(), null, null, durable ? Option.DURABLE : Option.NO_OPTION,
+ getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
// We need to sync so that we get notify of an error.
@@ -594,7 +595,7 @@ public class AMQSession_0_10 extends AMQSession
try
{
// this is done so that we can produce to a temporary queue beofre we create a consumer
- sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive());
+ sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(), result.isDurable(), result.isExclusive(),null);
sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new FieldTable(), result.getExchangeName(),result);
result.setQueueName(result.getRoutingKey());
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 00aa8e4d31..e0e319250e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -40,6 +40,8 @@ import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public final class AMQSession_0_8 extends AMQSession
{
@@ -125,10 +127,19 @@ public final class AMQSession_0_8 extends AMQSession
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
}
- public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+ public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+ FieldTable table = null;
+ if(arguments != null && !arguments.isEmpty())
+ {
+ table = new FieldTable();
+ for(Map.Entry<String, Object> entry : arguments.entrySet())
+ {
+ table.setObject(entry.getKey(), entry.getValue());
+ }
+ }
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
AMQFrame queueDeclare = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
@@ -412,6 +423,28 @@ public final class AMQSession_0_8 extends AMQSession
return subscriber;
}
+
+
+
+ public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ {
+ new FailoverRetrySupport<Object, AMQException>(
+ new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
index 136b9b94b6..85ee78bd17 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
@@ -206,16 +206,27 @@ public class MessageListenerMultiConsumerTest extends QpidTestCase
MessageConsumer consumer2 = _clientSession1.createConsumer(_queue);
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ int msg;
+ for (msg = 0; msg < (MSG_COUNT / 2); msg++)
{
- assertTrue(_consumer1.receive(3000) != null);
+
+ final Message message = _consumer1.receive(1000);
+ if(message == null)
+ {
+ break;
+ }
+
}
- for (int msg = 0; msg < (MSG_COUNT / 2); msg++)
+ _consumer1.close();
+ _clientSession1.close();
+
+ for (; msg < MSG_COUNT ; msg++)
{
- assertTrue(consumer2.receive(3000) != null);
+ assertTrue("Failed at msg id" + msg, _consumer2.receive(1000) != null);
}
+
}
else
{