diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-15 14:12:24 +0000 |
| commit | 75793016c7cc8fc156411041e4399243aadc563e (patch) | |
| tree | 418a201fa99bf12b32f5b77722ce92ed34e66e2f /java/broker | |
| parent | 6ca88beeea89e37ec725e5e99bada7ae48d2870f (diff) | |
| download | qpid-python-75793016c7cc8fc156411041e4399243aadc563e.tar.gz | |
QPID-183 Patch supplied by Rob Godfrey. Major changes to durable topic subscription handling.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
7 files changed, 302 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index 6dc97f9e48..ffd0e5d8ae 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange } } } + + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && queues.contains(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + final List<AMQQueue> queues = _index.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + Map<String, List<AMQQueue>> bindings = _index.getBindingsMap(); + for (List<AMQQueue> queues : bindings.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_index.getBindingsMap().isEmpty(); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index a692f9ebca..139307488e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange } // End of MBean class - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException + public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException { assert queue != null; assert routingKey != null; + _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>()); // if we got null back, no previous value was associated with the specified routing key hence @@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange // TODO: add support for the immediate flag if (queues == null) { + _logger.warn("No queues found for routing key " + routingKey); + _logger.warn("Routing map contains: " + _routingKey2queues); //todo Check for valid topic - mritchie return; } @@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange } } - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && queues.contains(queue); + } + + + public boolean isBound(String routingKey) throws AMQException + { + List<AMQQueue> queues = _routingKey2queues.get(routingKey); + return queues != null && !queues.isEmpty(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (List<AMQQueue> queues : _routingKey2queues.values()) + { + if (queues.contains(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_routingKey2queues.isEmpty(); + } + + public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException { assert queue != null; assert routingKey != null; @@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + " with routing key " + routingKey); } + if (queues.isEmpty()) + { + _routingKey2queues.remove(queues); + } } protected ExchangeMBean createMBean() throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 787d0eddfd..824e85dc5c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -47,4 +47,30 @@ public interface Exchange void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException; void route(AMQMessage message) throws AMQException; + + /** + * Determines whether a message would be isBound to a particular queue using a specific routing key + * @param routingKey + * @param queue + * @return + * @throws AMQException + */ + boolean isBound(String routingKey, AMQQueue queue) throws AMQException; + + /** + * Determines whether a message is routing to any queue using a specific routing key + * @param routingKey + * @return + * @throws AMQException + */ + boolean isBound(String routingKey) throws AMQException; + + boolean isBound(AMQQueue queue) throws AMQException; + + /** + * Returns true if this exchange has at least one binding associated with it. + * @return + * @throws AMQException + */ + boolean hasBindings() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 3fa73aa2e2..8c4df68dea 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -221,6 +221,33 @@ public class HeadersExchange extends AbstractExchange } } + public boolean isBound(String routingKey, AMQQueue queue) throws AMQException + { + return isBound(queue); + } + + public boolean isBound(String routingKey) throws AMQException + { + return hasBindings(); + } + + public boolean isBound(AMQQueue queue) throws AMQException + { + for (Registration r : _bindings) + { + if (r.queue.equals(queue)) + { + return true; + } + } + return false; + } + + public boolean hasBindings() throws AMQException + { + return !_bindings.isEmpty(); + } + protected Map getHeaders(ContentHeaderBody contentHeaderFrame) { //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers, diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java index fb48729c9e..485c4739bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,7 +38,7 @@ class Index private ConcurrentMap<String, List<AMQQueue>> _index = new ConcurrentHashMap<String, List<AMQQueue>>(); - boolean add(String key, AMQQueue queue) + synchronized boolean add(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if(queues == null) @@ -61,7 +62,7 @@ class Index } } - boolean remove(String key, AMQQueue queue) + synchronized boolean remove(String key, AMQQueue queue) { List<AMQQueue> queues = _index.get(key); if (queues != null) @@ -83,6 +84,6 @@ class Index Map<String, List<AMQQueue>> getBindingsMap() { - return _index; + return new HashMap<String, List<AMQQueue>>(_index); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java new file mode 100644 index 0000000000..5aaf78d6b7 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java @@ -0,0 +1,163 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.handler; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.ExchangeBoundBody; +import org.apache.qpid.framing.ExchangeBoundOkBody; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQMethodEvent; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.StateAwareMethodListener; + +/** + * @author Apache Software Foundation + */ +public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody> +{ + private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler(); + + public static final int OK = 0; + + public static final int EXCHANGE_NOT_FOUND = 1; + + public static final int QUEUE_NOT_FOUND = 2; + + public static final int NO_BINDINGS = 3; + + public static final int QUEUE_NOT_BOUND = 4; + + public static final int NO_QUEUE_BOUND_WITH_RK = 5; + + public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; + + public static ExchangeBoundHandler getInstance() + { + return _instance; + } + + private ExchangeBoundHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, + AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException + { + ExchangeBoundBody body = evt.getMethod(); + + String exchangeName = body.exchange; + String queueName = body.queue; + String routingKey = body.routingKey; + if (exchangeName == null) + { + throw new AMQException("Exchange exchange must not be null"); + } + Exchange exchange = exchangeRegistry.getExchange(exchangeName); + AMQFrame response; + if (exchange == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND, + "Exchange " + exchangeName + " not found"); + } + else if (routingKey == null) + { + if (queueName == null) + { + if (exchange.hasBindings()) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null); + } + } + else + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND, + "Queue " + queueName + " not bound to exchange " + + exchangeName); + } + } + } + } + else if (queueName != null) + { + AMQQueue queue = queueRegistry.getQueue(queueName); + if (queue == null) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND, + "Queue " + queueName + " not found"); + } + else + { + if (exchange.isBound(body.routingKey, queue)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, + "Queue " + queueName + + " not bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + } + else + { + if (exchange.isBound(body.routingKey)) + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, + null); + } + else + { + response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), + NO_QUEUE_BOUND_WITH_RK, + "No queue bound with routing key " + + body.routingKey + " to exchange " + + exchangeName); + } + } + protocolSession.writeFrame(response); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index 5e88ff7f2d..4e9deeb8db 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance()); frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance()); frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance()); + frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance()); frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance()); frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance()); frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance()); |
