diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-12-07 16:47:53 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-12-07 16:47:53 +0000 |
| commit | b6d5bbc102bd2053c99e911322c27099b0f895cd (patch) | |
| tree | 9d5b4ebadec507536dcd4248d272e002ae26bd56 /java/client/src | |
| parent | 7bbd0fdd577b167127633a7b52fe7ea487b1f267 (diff) | |
| download | qpid-python-b6d5bbc102bd2053c99e911322c27099b0f895cd.tar.gz | |
QPID-2242 : Update to the 0-8/9 code path to use the 0-10 static lookup tables for the Destination type when JMS_QPID_DESTTYPE has not been set.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@887994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
4 files changed, 211 insertions, 109 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java index 7f43e4b4b2..314508805d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java @@ -22,7 +22,6 @@ package org.apache.qpid.client.message; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.framing.BasicContentHeaderProperties; import javax.jms.Destination; import javax.jms.JMSException; diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index b233bc6d2b..1479f06632 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -41,12 +41,13 @@ import javax.jms.MessageNotWriteableException; import javax.jms.MessageFormatException; import javax.jms.DeliveryMode; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import org.apache.qpid.exchange.ExchangeDefaults; -public class AMQMessageDelegate_0_10 implements AMQMessageDelegate +/** + * This extends AbstractAMQMessageDelegate which contains common code between + * both the 0_8 and 0_10 Message types. + * + */ +public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -64,27 +65,6 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate private AMQSession _session; private final long _deliveryTag; - private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>(); - private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>(); - private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();; - - static - { - _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE); - _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); - _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); - - _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE); - _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE); - _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - - - _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); - } protected AMQMessageDelegate_0_10() { @@ -92,80 +72,49 @@ public class AMQMessageDelegate_0_10 implements AMQMessageDelegate _readableProperties = false; } - private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey) - { - AMQDestination dest; - switch(getExchangeType(exchange)) - { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); - - } - - return dest; - } - - private int getExchangeType(AMQShortString exchange) + protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) { - Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange); - - if(type == null) - { - return AMQDestination.UNKNOWN_TYPE; - } + _messageProps = messageProps; + _deliveryProps = deliveryProps; + _deliveryTag = deliveryTag; + _readableProperties = (_messageProps != null); + AMQDestination dest; - return type; + dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()), + new AMQShortString(_deliveryProps.getRoutingKey())); + setJMSDestination(dest); } - + /** + * Use the 0-10 ExchangeQuery call to validate the exchange type. + * + * This is used primarily to provide the correct JMSDestination value. + * + * The query is performed synchronously iff the map exchange is not already + * present in the exchange Map. + * + * @param header The message headers, from which the exchange name can be extracted + * @param session The 0-10 session to use to call ExchangeQuery + */ public static void updateExchangeTypeMapping(Header header, org.apache.qpid.transport.Session session) { DeliveryProperties deliveryProps = header.get(DeliveryProperties.class); - if(deliveryProps != null) + if (deliveryProps != null) { String exchange = deliveryProps.getExchange(); - if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange)) + if (exchange != null && !exchangeMapContains(exchange)) { - - AMQShortString exchangeShortString = new AMQShortString(exchange); Future<ExchangeQueryResult> future = - session.exchangeQuery(exchange.toString()); + session.exchangeQuery(exchange.toString()); ExchangeQueryResult res = future.get(); - Integer type = _exchangeTypeToDestinationType.get(res.getType()); - if(type == null) - { - type = AMQDestination.UNKNOWN_TYPE; - } - _exchangeTypeStringMap.put(exchange, type); - _exchangeTypeMap.put(exchangeShortString, type); - + updateExchangeType(exchange, res.getType()); } } } - protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag) - { - _messageProps = messageProps; - _deliveryProps = deliveryProps; - _deliveryTag = deliveryTag; - _readableProperties = (_messageProps != null); - - AMQDestination dest; - - dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()), - new AMQShortString(_deliveryProps.getRoutingKey())); - setJMSDestination(dest); - } - public String getJMSMessageID() throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index 902de08d3f..5157632280 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -46,7 +46,7 @@ import java.util.Enumeration; import java.util.UUID; import java.net.URISyntaxException; -public class AMQMessageDelegate_0_8 implements AMQMessageDelegate +public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap()); @@ -65,6 +65,16 @@ public class AMQMessageDelegate_0_8 implements AMQMessageDelegate private AMQSession _session; private final long _deliveryTag; + // The base set of items that needs to be set. + private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + _readableProperties = (_contentHeaderProperties != null); + _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + } + + // Used for the creation of new messages protected AMQMessageDelegate_0_8() { this(new BasicContentHeaderProperties(), -1); @@ -73,6 +83,7 @@ public class AMQMessageDelegate_0_8 implements AMQMessageDelegate } + // Used when generating a received message object protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey) { @@ -80,41 +91,33 @@ public class AMQMessageDelegate_0_8 implements AMQMessageDelegate Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); - if(type == null) + AMQDestination dest = null; + + // If we have a type set the attempt to use that. + if (type != null) { - type = AMQDestination.UNKNOWN_TYPE; + switch (type.intValue()) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + default: + // Use the generateDestination method + dest = null; + } } - AMQDestination dest; - - switch(type.intValue()) + if (dest == null) { - case AMQDestination.QUEUE_TYPE: - dest = new AMQQueue(exchange, routingKey, routingKey); - break; - case AMQDestination.TOPIC_TYPE: - dest = new AMQTopic(exchange, routingKey, null); - break; - default: - dest = new AMQUndefinedDestination(exchange, routingKey, null); + dest = generateDestination(exchange, routingKey); } - - - // Destination dest = AMQDestination.createDestination(url); setJMSDestination(dest); - - - } - protected AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) - { - _contentHeaderProperties = properties; - _deliveryTag = deliveryTag; - _readableProperties = (_contentHeaderProperties != null); - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); - } public String getJMSMessageID() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java new file mode 100644 index 0000000000..534fb19b76 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.message; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQUndefinedDestination; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This abstract class provides exchange lookup functionality that is shared + * between all MessageDelegates. Update facilities are provided so that the 0-10 + * code base can update the mappings. The 0-8 code base does not have the + * facility to update the exchange map so it can only use the default mappings. + * + * That said any updates that a 0-10 client performs will also benefit any 0-8 + * connections in this VM. + * + */ +public abstract class AbstractAMQMessageDelegate implements AMQMessageDelegate +{ + + private static Map<AMQShortString, Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>(); + private static Map<String, Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>(); + private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>(); + + /** + * Add default Mappings for the Direct, Default, Topic and Fanout exchanges. + */ + static + { + _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE); + _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); + _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE); + + _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE); + _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE); + _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + + _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE); + } + + /** + * Called when a Destination is requried. + * + * This will create the AMQDestination that is the correct type and value + * based on the incomming values. + * @param exchange The exchange name + * @param routingKey The routing key to be used for the Destination + * @return AMQDestination of the correct subtype + */ + protected AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey) + { + AMQDestination dest; + switch (getExchangeType(exchange)) + { + case AMQDestination.QUEUE_TYPE: + dest = new AMQQueue(exchange, routingKey, routingKey); + break; + case AMQDestination.TOPIC_TYPE: + dest = new AMQTopic(exchange, routingKey, null); + break; + default: + dest = new AMQUndefinedDestination(exchange, routingKey, null); + } + + return dest; + } + + /** + * Update the exchange name to type mapping. + * + * If the newType is not known then an UNKNOWN_TYPE is created. Only if the + * exchange is of a known type: amq.direct, amq.topic, amq.fanout can we + * create a suitable AMQDestination representation + * + * @param exchange the name of the exchange + * @param newtype the AMQP exchange class name i.e. amq.direct + */ + protected static void updateExchangeType(String exchange, String newtype) + { + Integer type = _exchangeTypeToDestinationType.get(newtype); + if (type == null) + { + type = AMQDestination.UNKNOWN_TYPE; + } + _exchangeTypeStringMap.put(exchange, type); + _exchangeTypeMap.put(new AMQShortString(exchange), type); + } + + /** + * Accessor method to allow lookups of the given exchange name. + * + * This check allows the prevention of extra work required such as asking + * the broker for the exchange class name. + * + * @param exchange the exchange name to lookup + * @return true if there is a mapping for this exchange + */ + protected static boolean exchangeMapContains(String exchange) + { + return _exchangeTypeStringMap.containsKey(exchange); + } + + /** + * Returns an int representing the exchange type. This is used in the + * createDestination method to ensure the correct AMQDestiation is created. + * + * @param exchange the exchange name to lookup + * @return int representing the Exchange type + */ + private int getExchangeType(AMQShortString exchange) + { + Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange); + + if (type == null) + { + return AMQDestination.UNKNOWN_TYPE; + } + + return type; + } + +} |
