diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-07-11 11:01:06 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-07-11 11:01:06 +0000 |
| commit | bf4878d834c41c4c6b90fa8ff327623a82495340 (patch) | |
| tree | 263e560833081d2c9fedd1f975647e328dbbb572 /qpid/java/broker | |
| parent | e16f7d01e8a9c049dc99a08cd38634f174e2d41a (diff) | |
| download | qpid-python-bf4878d834c41c4c6b90fa8ff327623a82495340.tar.gz | |
QPID-3998, QPID-3999: System tests for Rest API, small fixups for the adapters and web ui.
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1360121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
9 files changed, 91 insertions, 10 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 71a4a84323..fead99a79d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -1686,4 +1686,10 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { return getQMFId().compareTo(session.getQMFId()); } + + @Override + public int getConsumerCount() + { + return _tag2SubscriptionMap.size(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java index 5dda4d66cd..bf703e6fbe 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -99,7 +99,7 @@ public interface Queue extends ConfiguredObject public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped"; public static final String SORT_KEY = "sortKey"; public static final String TYPE = "type"; - + public static final String PRIORITIES = "priorities"; @@ -132,7 +132,8 @@ public interface Queue extends ConfiguredObject ALERT_THRESHOLD_MESSAGE_SIZE, ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, - ALERT_REPEAT_GAP + ALERT_REPEAT_GAP, + PRIORITIES )); //children diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java new file mode 100644 index 0000000000..96f2a7e2e5 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/QueueType.java @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum QueueType +{ + STANDARD, + PRIORITY, + LVQ, + SORTED +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java index 24df5e97fa..21c4aef323 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java @@ -58,9 +58,9 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, "x-qpid-capacity"); QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, "x-qpid-flow-resume-capacity"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, "qpid.sort_key"); - QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, "qpid.last_value_queue_key"); - + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.SORT_KEY, AMQQueueFactory.QPID_QUEUE_SORT_KEY); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LAST_VALUE_QUEUE_KEY); + QueueAdapter.ATTRIBUTE_MAPPINGS.put(Queue.PRIORITIES, AMQQueueFactory.X_QPID_PRIORITIES); } private final AMQQueue _queue; @@ -454,7 +454,13 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs { return _queue.getDescription(); } - + else if(PRIORITIES.equals(name)) + { + if(_queue instanceof AMQPriorityQueue) + { + return ((AMQPriorityQueue)_queue).getPriorities(); + } + } return super.getAttribute(name); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index fa0293ad18..d802697d67 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -198,8 +198,7 @@ final class SessionAdapter extends AbstractAdapter implements Session } else if(name.equals(CONSUMER_COUNT)) { - final Collection<Consumer> subscriptions = getSubscriptions(); - return subscriptions == null ? 0 : subscriptions.size(); + return _session.getConsumerCount(); } else if(name.equals(LOCAL_TRANSACTION_BEGINS)) { @@ -207,12 +206,12 @@ final class SessionAdapter extends AbstractAdapter implements Session } else if(name.equals(LOCAL_TRANSACTION_OPEN)) { - long open = _session.getTxnCount() - (_session.getTxnCommits() + _session.getTxnRejects()); + long open = _session.getTxnStart() - (_session.getTxnCommits() + _session.getTxnRejects()); return (Boolean) (open > 0l); } else if(name.equals(LOCAL_TRANSACTION_ROLLBACKS)) { - return _session.getTxnCommits(); + return _session.getTxnRejects(); } else if(name.equals(STATE_CHANGED)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 52cf806be2..bcfdb22fa9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.QueueType; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; @@ -252,6 +253,31 @@ final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, E { attributes = new HashMap<String, Object>(attributes); + if (attributes.containsKey(Queue.TYPE)) + { + String typeAttribute = getStringAttribute(Queue.TYPE, attributes, null); + QueueType queueType = null; + try + { + queueType = QueueType.valueOf(typeAttribute.toUpperCase()); + } + catch(Exception e) + { + throw new IllegalArgumentException("Unsupported queue type :" + typeAttribute); + } + if (queueType == QueueType.LVQ && attributes.get(Queue.LVQ_KEY) == null) + { + attributes.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_LVQ_KEY); + } + else if (queueType == QueueType.PRIORITY && attributes.get(Queue.PRIORITIES) == null) + { + attributes.put(Queue.PRIORITIES, 10); + } + else if (queueType == QueueType.SORTED && attributes.get(Queue.SORT_KEY) == null) + { + throw new IllegalArgumentException("Sort key is not specified for sorted queue"); + } + } String name = getStringAttribute(Queue.NAME, attributes, null); State state = getEnumAttribute(State.class, Queue.STATE, attributes, State.ACTIVE); boolean durable = getBooleanAttribute(Queue.DURABLE, attributes, false); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index d18353682b..a8f62b0fa2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -84,4 +84,6 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> Long getTxnRejects(); int getChannelId(); + + int getConsumerCount(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index f0ba5646e0..5b5540897b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -1070,4 +1070,10 @@ public class ServerSession extends Session { return getQMFId().compareTo(session.getQMFId()); } + + @Override + public int getConsumerCount() + { + return _subscriptions.values().size(); + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 51d20b6d5f..8c5d2684ff 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -403,6 +403,12 @@ public class MockSubscription implements Subscription { return 0; } + + @Override + public int getConsumerCount() + { + return 0; + } } private static class MockConnectionModel implements AMQConnectionModel |
