summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java34
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java45
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java30
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java163
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java5
7 files changed, 302 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
index 6dc97f9e48..ffd0e5d8ae 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -197,4 +197,34 @@ public class DestNameExchange extends AbstractExchange
}
}
}
+
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ final List<AMQQueue> queues = _index.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+ for (List<AMQQueue> queues : bindings.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_index.getBindingsMap().isEmpty();
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
index a692f9ebca..139307488e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,10 +126,11 @@ public class DestWildExchange extends AbstractExchange
} // End of MBean class
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ public synchronized void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert routingKey != null;
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey);
// we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
// if we got null back, no previous value was associated with the specified routing key hence
@@ -159,6 +160,8 @@ public class DestWildExchange extends AbstractExchange
// TODO: add support for the immediate flag
if (queues == null)
{
+ _logger.warn("No queues found for routing key " + routingKey);
+ _logger.warn("Routing map contains: " + _routingKey2queues);
//todo Check for valid topic - mritchie
return;
}
@@ -172,7 +175,37 @@ public class DestWildExchange extends AbstractExchange
}
}
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && queues.contains(queue);
+ }
+
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+ return queues != null && !queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (List<AMQQueue> queues : _routingKey2queues.values())
+ {
+ if (queues.contains(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_routingKey2queues.isEmpty();
+ }
+
+ public synchronized void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
{
assert queue != null;
assert routingKey != null;
@@ -190,6 +223,10 @@ public class DestWildExchange extends AbstractExchange
throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
" with routing key " + routingKey);
}
+ if (queues.isEmpty())
+ {
+ _routingKey2queues.remove(queues);
+ }
}
protected ExchangeMBean createMBean() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 787d0eddfd..824e85dc5c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,4 +47,30 @@ public interface Exchange
void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
void route(AMQMessage message) throws AMQException;
+
+ /**
+ * Determines whether a message would be isBound to a particular queue using a specific routing key
+ * @param routingKey
+ * @param queue
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey, AMQQueue queue) throws AMQException;
+
+ /**
+ * Determines whether a message is routing to any queue using a specific routing key
+ * @param routingKey
+ * @return
+ * @throws AMQException
+ */
+ boolean isBound(String routingKey) throws AMQException;
+
+ boolean isBound(AMQQueue queue) throws AMQException;
+
+ /**
+ * Returns true if this exchange has at least one binding associated with it.
+ * @return
+ * @throws AMQException
+ */
+ boolean hasBindings() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 3fa73aa2e2..8c4df68dea 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -221,6 +221,33 @@ public class HeadersExchange extends AbstractExchange
}
}
+ public boolean isBound(String routingKey, AMQQueue queue) throws AMQException
+ {
+ return isBound(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+ return hasBindings();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+ for (Registration r : _bindings)
+ {
+ if (r.queue.equals(queue))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_bindings.isEmpty();
+ }
+
protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
index fb48729c9e..485c4739bd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import java.util.List;
import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,7 +38,7 @@ class Index
private ConcurrentMap<String, List<AMQQueue>> _index
= new ConcurrentHashMap<String, List<AMQQueue>>();
- boolean add(String key, AMQQueue queue)
+ synchronized boolean add(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if(queues == null)
@@ -61,7 +62,7 @@ class Index
}
}
- boolean remove(String key, AMQQueue queue)
+ synchronized boolean remove(String key, AMQQueue queue)
{
List<AMQQueue> queues = _index.get(key);
if (queues != null)
@@ -83,6 +84,6 @@ class Index
Map<String, List<AMQQueue>> getBindingsMap()
{
- return _index;
+ return new HashMap<String, List<AMQQueue>>(_index);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
new file mode 100644
index 0000000000..5aaf78d6b7
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
+{
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
+ public static final int OK = 0;
+
+ public static final int EXCHANGE_NOT_FOUND = 1;
+
+ public static final int QUEUE_NOT_FOUND = 2;
+
+ public static final int NO_BINDINGS = 3;
+
+ public static final int QUEUE_NOT_BOUND = 4;
+
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
+ public static ExchangeBoundHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private ExchangeBoundHandler()
+ {
+ }
+
+ public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+ AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ {
+ ExchangeBoundBody body = evt.getMethod();
+
+ String exchangeName = body.exchange;
+ String queueName = body.queue;
+ String routingKey = body.routingKey;
+ if (exchangeName == null)
+ {
+ throw new AMQException("Exchange exchange must not be null");
+ }
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName);
+ AMQFrame response;
+ if (exchange == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
+ "Exchange " + exchangeName + " not found");
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ }
+ }
+ else
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
+ "Queue " + queueName + " not bound to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = queueRegistry.getQueue(queueName);
+ if (queue == null)
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
+ "Queue " + queueName + " not found");
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey, queue))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
+ "Queue " + queueName +
+ " not bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.routingKey))
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
+ null);
+ }
+ else
+ {
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ NO_QUEUE_BOUND_WITH_RK,
+ "No queue bound with routing key " +
+ body.routingKey + " to exchange " +
+ exchangeName);
+ }
+ }
+ protocolSession.writeFrame(response);
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
index 5e88ff7f2d..4e9deeb8db 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -109,6 +109,7 @@ public class AMQStateManager implements AMQMethodListener
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(ExchangeDeclareBody.class, ExchangeDeclareHandler.getInstance());
frame2handlerMap.put(ExchangeDeleteBody.class, ExchangeDeleteHandler.getInstance());
+ frame2handlerMap.put(ExchangeBoundBody.class, ExchangeBoundHandler.getInstance());
frame2handlerMap.put(BasicAckBody.class, BasicAckMethodHandler.getInstance());
frame2handlerMap.put(BasicRecoverBody.class, BasicRecoverMethodHandler.getInstance());
frame2handlerMap.put(BasicConsumeBody.class, BasicConsumeMethodHandler.getInstance());