summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-07-11 11:01:06 +0000
committerRobert Gemmell <robbie@apache.org>2012-07-11 11:01:06 +0000
commitbf4878d834c41c4c6b90fa8ff327623a82495340 (patch)
tree263e560833081d2c9fedd1f975647e328dbbb572 /qpid/java/broker
parente16f7d01e8a9c049dc99a08cd38634f174e2d41a (diff)
downloadqpid-python-bf4878d834c41c4c6b90fa8ff327623a82495340.tar.gz
QPID-3998, QPID-3999: System tests for Rest API, small fixups for the adapters and web ui.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1360121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java6
9 files changed, 91 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 71a4a84323..fead99a79d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -1686,4 +1686,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
return getQMFId().compareTo(session.getQMFId());
}
+
+ @Override
+ public int getConsumerCount()
+ {
+ return _tag2SubscriptionMap.size();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
index 5dda4d66cd..bf703e6fbe 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -99,7 +99,7 @@ public interface Queue extends ConfiguredObject
public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped";
public static final String SORT_KEY = "sortKey";
public static final String TYPE = "type";
-
+ public static final String PRIORITIES = "priorities";
@@ -132,7 +132,8 @@ public interface Queue extends ConfiguredObject
ALERT_THRESHOLD_MESSAGE_SIZE,
ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
- ALERT_REPEAT_GAP
+ ALERT_REPEAT_GAP,
+ PRIORITIES
));
//children
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java
new file mode 100644
index 0000000000..96f2a7e2e5
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum QueueType
+{
+ STANDARD,
+ PRIORITY,
+ LVQ,
+ SORTED
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index 24df5e97fa..21c4aef323 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -58,9 +58,9 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity");
QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, "qpid.sort_key");
- QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, "qpid.last_value_queue_key");
-
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY);
+ QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES);
}
private final AMQQueue _queue;
@@ -454,7 +454,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
{
return _queue.getDescription();
}
-
+ else if(PRIORITIES.equals(name))
+ {
+ if(_queue instanceof AMQPriorityQueue)
+ {
+ return ((AMQPriorityQueue)_queue).getPriorities();
+ }
+ }
return super.getAttribute(name);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index fa0293ad18..d802697d67 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -198,8 +198,7 @@ final class SessionAdapter extends AbstractAdapter implements Session
}
else if(name.equals(CONSUMER_COUNT))
{
- final Collection<Consumer> subscriptions = getSubscriptions();
- return subscriptions == null ? 0 : subscriptions.size();
+ return _session.getConsumerCount();
}
else if(name.equals(LOCAL_TRANSACTION_BEGINS))
{
@@ -207,12 +206,12 @@ final class SessionAdapter extends AbstractAdapter implements Session
}
else if(name.equals(LOCAL_TRANSACTION_OPEN))
{
- long open = _session.getTxnCount() - (_session.getTxnCommits() + _session.getTxnRejects());
+ long open = _session.getTxnStart() - (_session.getTxnCommits() + _session.getTxnRejects());
return (Boolean) (open > 0l);
}
else if(name.equals(LOCAL_TRANSACTION_ROLLBACKS))
{
- return _session.getTxnCommits();
+ return _session.getTxnRejects();
}
else if(name.equals(STATE_CHANGED))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 52cf806be2..bcfdb22fa9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.QueueType;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -252,6 +253,31 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E
{
attributes = new HashMap<String, Object>(attributes);
+ if (attributes.containsKey(Queue.TYPE))
+ {
+ String typeAttribute = getStringAttribute(Queue.TYPE, attributes, null);
+ QueueType queueType = null;
+ try
+ {
+ queueType = QueueType.valueOf(typeAttribute.toUpperCase());
+ }
+ catch(Exception e)
+ {
+ throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute);
+ }
+ if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null)
+ {
+ attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY);
+ }
+ else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null)
+ {
+ attributes.put(Queue.PRIORITIES, 10);
+ }
+ else if (queueType == QueueType.SORTED && attributes.get(Queue.SORT_KEY) == null)
+ {
+ throw new IllegalArgumentException("Sort key is not specified for sorted queue");
+ }
+ }
String name = getStringAttribute(Queue.NAME, attributes, null);
State state = getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE);
boolean durable = getBooleanAttribute(Queue.DURABLE, attributes, false);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index d18353682b..a8f62b0fa2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -84,4 +84,6 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
Long getTxnRejects();
int getChannelId();
+
+ int getConsumerCount();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index f0ba5646e0..5b5540897b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -1070,4 +1070,10 @@ public class ServerSession extends Session
{
return getQMFId().compareTo(session.getQMFId());
}
+
+ @Override
+ public int getConsumerCount()
+ {
+ return _subscriptions.values().size();
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 51d20b6d5f..8c5d2684ff 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -403,6 +403,12 @@ public class MockSubscription implements Subscription
{
return 0;
}
+
+ @Override
+ public int getConsumerCount()
+ {
+ return 0;
+ }
}
private static class MockConnectionModel implements AMQConnectionModel