diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-02-27 05:17:07 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-02-27 05:17:07 +0000 |
| commit | b0c194956862fd21c45bdfa7c552d18f16932491 (patch) | |
| tree | f470535f5b2d418d1389d07a6a0bb0e9ec88a5e1 /java | |
| parent | e4218487f49a2685bc58da7994f1d8477ccae393 (diff) | |
| download | qpid-python-b0c194956862fd21c45bdfa7c552d18f16932491.tar.gz | |
Added a new parser for the BindingURL.
This allows adding multiple binding keys, using # and * in the URLs.
This is tracked via QPID-814
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@631489 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 506 insertions, 132 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java index 529a05b2e2..42f125920c 100644 --- a/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java @@ -20,28 +20,29 @@ */ package org.apache.qpid.url; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashMap; - public class AMQBindingURL implements BindingURL { private static final Logger _logger = LoggerFactory.getLogger(AMQBindingURL.class); String _url; - AMQShortString _exchangeClass; - AMQShortString _exchangeName; - AMQShortString _destinationName; - AMQShortString _queueName; + AMQShortString _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + AMQShortString _exchangeName = new AMQShortString(""); + AMQShortString _destinationName = new AMQShortString("");; + AMQShortString _queueName = new AMQShortString(""); + AMQShortString[] _bindingKeys = new AMQShortString[0]; private HashMap<String, String> _options; - public AMQBindingURL(String url) throws URLSyntaxException + public AMQBindingURL(String url) throws URISyntaxException { // format: // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* @@ -52,116 +53,35 @@ public class AMQBindingURL implements BindingURL parseBindingURL(); } - private void parseBindingURL() throws URLSyntaxException + private void parseBindingURL() throws URISyntaxException { - try - { - URI connection = new URI(_url); - - String exchangeClass = connection.getScheme(); - - if (exchangeClass == null) - { - _url = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + "://" + "" + "//" + _url; - // URLHelper.parseError(-1, "Exchange Class not specified.", _url); - parseBindingURL(); - - return; - } - else - { - setExchangeClass(exchangeClass); - } - - String exchangeName = connection.getHost(); - - if (exchangeName == null) - { - if (getExchangeClass().equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)) - { - setExchangeName(""); - } - else - { - throw URLHelper.parseError(-1, "Exchange Name not specified.", _url); - } - } - else - { - setExchangeName(exchangeName); - } - - String queueName; - - if ((connection.getPath() == null) || connection.getPath().equals("")) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination or Queue requried", _url); - } - else - { - int slash = connection.getPath().indexOf("/", 1); - if (slash == -1) - { - throw URLHelper.parseError(_url.indexOf(_exchangeName.toString()) + _exchangeName.length(), - "Destination requried", _url); - } - else - { - String path = connection.getPath(); - setDestinationName(path.substring(1, slash)); - - // We don't set queueName yet as the actual value we use depends on options set - // when we are dealing with durable subscriptions - - queueName = path.substring(slash + 1); - - } - } - - URLHelper.parseOptions(_options, connection.getQuery()); - - processOptions(); - - // We can now call setQueueName as the URL is full parsed. - - setQueueName(queueName); - - // Fragment is #string (not used) - _logger.debug("URL Parsed: " + this); - - } - catch (URISyntaxException uris) - { - - throw URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput()); - - } + BindingURLParser parser = new BindingURLParser(_url,this); + processOptions(); + _logger.debug("URL Parsed: " + this); } - private void setExchangeClass(String exchangeClass) + public void setExchangeClass(String exchangeClass) { setExchangeClass(new AMQShortString(exchangeClass)); } - private void setQueueName(String name) throws URLSyntaxException + public void setQueueName(String name) { setQueueName(new AMQShortString(name)); } - private void setDestinationName(String name) + public void setDestinationName(String name) { setDestinationName(new AMQShortString(name)); } - private void setExchangeName(String exchangeName) + public void setExchangeName(String exchangeName) { setExchangeName(new AMQShortString(exchangeName)); } - private void processOptions() + private void processOptions() throws URISyntaxException { - // this is where we would parse any options that needed more than just storage. } public String getURL() @@ -210,34 +130,9 @@ public class AMQBindingURL implements BindingURL return _queueName; } - public void setQueueName(AMQShortString name) throws URLSyntaxException + public void setQueueName(AMQShortString name) { - if (_exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) - { - if (Boolean.parseBoolean(getOption(OPTION_DURABLE))) - { - if (containsOption(BindingURL.OPTION_CLIENTID) && containsOption(BindingURL.OPTION_SUBSCRIPTION)) - { - _queueName = - new AMQShortString(getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION)); - } - else - { - throw URLHelper.parseError(-1, "Durable subscription must have values for " + BindingURL.OPTION_CLIENTID - + " and " + BindingURL.OPTION_SUBSCRIPTION + ".", _url); - - } - } - else - { - _queueName = null; - } - } - else - { - _queueName = name; - } - + _queueName = name; } public String getOption(String key) @@ -261,7 +156,7 @@ public class AMQBindingURL implements BindingURL { if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } else { @@ -271,12 +166,29 @@ public class AMQBindingURL implements BindingURL if (containsOption(BindingURL.OPTION_ROUTING_KEY)) { - return new AMQShortString(getOption(OPTION_ROUTING_KEY)); + return new AMQShortString((String)getOption(OPTION_ROUTING_KEY)); } return getDestinationName(); } + public AMQShortString[] getBindingKeys() + { + if (_bindingKeys != null && _bindingKeys.length>0) + { + return _bindingKeys; + } + else + { + return new AMQShortString[]{getRoutingKey()}; + } + } + + public void setBindingKeys(AMQShortString[] keys) + { + _bindingKeys = keys; + } + public void setRoutingKey(AMQShortString key) { setOption(OPTION_ROUTING_KEY, key.toString()); @@ -296,6 +208,21 @@ public class AMQBindingURL implements BindingURL sb.append(URLHelper.printOptions(_options)); - return sb.toString(); + // temp hack + if (sb.toString().indexOf("?") == -1) + { + sb.append("?"); + } + else + { + sb.append("&"); + } + + for (AMQShortString key :_bindingKeys) + { + sb.append("bindingKey='").append(key.toString()).append("'&"); + } + + return sb.toString().substring(0,sb.toString().length()-1); } } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java index 67be2db86f..c4346d721b 100644 --- a/java/common/src/main/java/org/apache/qpid/url/BindingURL.java +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURL.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,6 +34,7 @@ public interface BindingURL public static final String OPTION_CLIENTID = "clientid"; public static final String OPTION_SUBSCRIPTION = "subscription"; public static final String OPTION_ROUTING_KEY = "routingkey"; + public static final String OPTION_BINDING_KEY = "bindingKey"; String getURL(); @@ -52,5 +53,7 @@ public interface BindingURL AMQShortString getRoutingKey(); + AMQShortString[] getBindingKeys(); + String toString(); } diff --git a/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java new file mode 100644 index 0000000000..f7e0b53cb1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/url/BindingURLParser.java @@ -0,0 +1,444 @@ +package org.apache.qpid.url; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BindingURLParser +{ + private static final char PROPERTY_EQUALS_CHAR = '='; + private static final char PROPERTY_SEPARATOR_CHAR = '&'; + private static final char ALTERNATIVE_PROPERTY_SEPARATOR_CHAR = ','; + private static final char FORWARD_SLASH_CHAR = '/'; + private static final char QUESTION_MARK_CHAR = '?'; + private static final char SINGLE_QUOTE_CHAR = '\''; + private static final char COLON_CHAR = ':'; + private static final char END_OF_URL_MARKER_CHAR = '%'; + + private static final Logger _logger = LoggerFactory.getLogger(BindingURLImpl.class); + + private char[] _url; + private AMQBindingURL _bindingURL; + private BindingURLParserState _currentParserState; + private String _error; + private int _index = 0; + private String _currentPropName; + private Map<String,Object> _options = new HashMap<String,Object>(); + + //<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + public BindingURLParser(String url,AMQBindingURL bindingURL) throws URISyntaxException + { + _url = (url + END_OF_URL_MARKER_CHAR).toCharArray(); + _bindingURL = bindingURL; + _currentParserState = BindingURLParserState.BINDING_URL_START; + BindingURLParserState prevState = _currentParserState; + + try + { + while (_currentParserState != BindingURLParserState.ERROR && _currentParserState != BindingURLParserState.BINDING_URL_END) + { + prevState = _currentParserState; + _currentParserState = next(); + } + + if (_currentParserState == BindingURLParserState.ERROR) + { + _error = + "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ") due to " + _error; + _logger.debug(_error); + URISyntaxException ex; + ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + throw ex; + } + + processOptions(); + } + catch (ArrayIndexOutOfBoundsException e) + { + _error = "Invalid URL format [current_state = " + prevState + ", details parsed so far " + _bindingURL + " ] error at (" + _index + ")"; + URISyntaxException ex = new URISyntaxException(markErrorLocation(),"Error occured while parsing URL",_index); + ex.initCause(e); + throw ex; + } + } + + enum BindingURLParserState + { + BINDING_URL_START, + EXCHANGE_CLASS, + COLON_CHAR, + DOUBLE_SEP, + EXCHANGE_NAME, + EXCHANGE_SEPERATOR_CHAR, + DESTINATION, + DESTINATION_SEPERATOR_CHAR, + QUEUE_NAME, + QUESTION_MARK_CHAR, + PROPERTY_NAME, + PROPERTY_EQUALS, + START_PROPERTY_VALUE, + PROPERTY_VALUE, + END_PROPERTY_VALUE, + PROPERTY_SEPARATOR, + BINDING_URL_END, + ERROR + } + + /** + * I am fully ware that there are few optimizations + * that can speed up things a wee bit. But I have opted + * for readability and maintainability at the expense of + * speed, as speed is not a critical factor here. + * + * One can understand the full parse logic by just looking at this method. + */ + private BindingURLParserState next() + { + switch (_currentParserState) + { + case BINDING_URL_START: + return extractExchangeClass(); + case COLON_CHAR: + _index++; //skip ":" + return BindingURLParserState.DOUBLE_SEP; + case DOUBLE_SEP: + _index = _index + 2; //skip "//" + return BindingURLParserState.EXCHANGE_NAME; + case EXCHANGE_NAME: + return extractExchangeName(); + case EXCHANGE_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.DESTINATION; + case DESTINATION: + return extractDestination(); + case DESTINATION_SEPERATOR_CHAR: + _index++; // skip '/' + return BindingURLParserState.QUEUE_NAME; + case QUEUE_NAME: + return extractQueueName(); + case QUESTION_MARK_CHAR: + _index++; // skip '?' + return BindingURLParserState.PROPERTY_NAME; + case PROPERTY_NAME: + return extractPropertyName(); + case PROPERTY_EQUALS: + _index++; // skip the equal sign + return BindingURLParserState.START_PROPERTY_VALUE; + case START_PROPERTY_VALUE: + _index++; // skip the '\'' + return BindingURLParserState.PROPERTY_VALUE; + case PROPERTY_VALUE: + return extractPropertyValue(); + case END_PROPERTY_VALUE: + _index ++; + return checkEndOfURL(); + case PROPERTY_SEPARATOR: + _index++; // skip '&' + return BindingURLParserState.PROPERTY_NAME; + default: + return BindingURLParserState.ERROR; + } + } + + private BindingURLParserState extractExchangeClass() + { + char nextChar = _url[_index]; + + // check for the following special cases. + // "myQueue?durable='true'" or just "myQueue"; + + StringBuilder builder = new StringBuilder(); + while (nextChar != COLON_CHAR && nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // normal use case + if (nextChar == COLON_CHAR) + { + _bindingURL.setExchangeClass(builder.toString()); + return BindingURLParserState.COLON_CHAR; + } + // "myQueue?durable='true'" use case + else if (nextChar == QUESTION_MARK_CHAR) + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + _bindingURL.setExchangeClass(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()); + _bindingURL.setExchangeName(""); + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractExchangeName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + _bindingURL.setExchangeName(builder.toString()); + return BindingURLParserState.EXCHANGE_SEPERATOR_CHAR; + } + + private BindingURLParserState extractDestination() + { + char nextChar = _url[_index]; + + //The destination is and queue name are both optional + // This is checking for the case where both are not specified. + if (nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + + StringBuilder builder = new StringBuilder(); + while (nextChar != FORWARD_SLASH_CHAR && nextChar != QUESTION_MARK_CHAR) + { + builder.append(nextChar); + _index++; + nextChar = _url[_index]; + } + + // This is the case where the destination is explictily stated. + // ex direct://amq.direct/myDest/myQueue?option1='1' ... OR + // direct://amq.direct//myQueue?option1='1' ... + if (nextChar == FORWARD_SLASH_CHAR) + { + _bindingURL.setDestinationName(builder.toString()); + return BindingURLParserState.DESTINATION_SEPERATOR_CHAR; + } + // This is the case where destination is not explictly stated. + // ex direct://amq.direct/myQueue?option1='1' ... + else + { + _bindingURL.setQueueName(builder.toString()); + return BindingURLParserState.QUESTION_MARK_CHAR; + } + } + + private BindingURLParserState extractQueueName() + { + char nextChar = _url[_index]; + StringBuilder builder = new StringBuilder(); + while (nextChar != QUESTION_MARK_CHAR && nextChar != END_OF_URL_MARKER_CHAR) + { + builder.append(nextChar); + nextChar = _url[++_index]; + } + _bindingURL.setQueueName(builder.toString()); + + if(nextChar == QUESTION_MARK_CHAR) + { + return BindingURLParserState.QUESTION_MARK_CHAR; + } + else + { + return BindingURLParserState.BINDING_URL_END; + } + } + + private BindingURLParserState extractPropertyName() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != PROPERTY_EQUALS_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + _currentPropName = builder.toString(); + + if (_currentPropName.trim().equals("")) + { + _error = "Property name cannot be empty"; + return BindingURLParserState.ERROR; + } + + return BindingURLParserState.PROPERTY_EQUALS; + } + + private BindingURLParserState extractPropertyValue() + { + StringBuilder builder = new StringBuilder(); + char next = _url[_index]; + while (next != SINGLE_QUOTE_CHAR) + { + builder.append(next); + next = _url[++_index]; + } + String propValue = builder.toString(); + + if (propValue.trim().equals("")) + { + _error = "Property values cannot be empty"; + return BindingURLParserState.ERROR; + } + else + { + if (_options.containsKey(_currentPropName)) + { + Object obj = _options.get(_currentPropName); + if (obj instanceof List) + { + List list = (List)obj; + list.add(propValue); + } + else // it has to be a string + { + List<String> list = new ArrayList(); + list.add((String)obj); + list.add(propValue); + _options.put(_currentPropName, list); + } + } + else + { + _options.put(_currentPropName, propValue); + } + + + return BindingURLParserState.END_PROPERTY_VALUE; + } + } + + private BindingURLParserState checkEndOfURL() + { + char nextChar = _url[_index]; + if ( nextChar == END_OF_URL_MARKER_CHAR) + { + return BindingURLParserState.BINDING_URL_END; + } + else if (nextChar == PROPERTY_SEPARATOR_CHAR || nextChar == ALTERNATIVE_PROPERTY_SEPARATOR_CHAR) + { + return BindingURLParserState.PROPERTY_SEPARATOR; + } + else + { + return BindingURLParserState.ERROR; + } + } + + private String markErrorLocation() + { + String tmp = String.valueOf(_url); + // length -1 to remove ENDOF URL marker + return tmp.substring(0,_index) + "^" + tmp.substring(_index+1> tmp.length()-1?tmp.length()-1:_index+1,tmp.length()-1); + } + + private void processOptions() throws URISyntaxException + { +// check for bindingKey + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.get(BindingURL.OPTION_BINDING_KEY) != null) + { + Object obj = _options.get(BindingURL.OPTION_BINDING_KEY); + + if (obj instanceof String) + { + AMQShortString[] bindingKeys = new AMQShortString[]{new AMQShortString((String)obj)}; + _bindingURL.setBindingKeys(bindingKeys); + } + else // it would be a list + { + List list = (List)obj; + AMQShortString[] bindingKeys = new AMQShortString[list.size()]; + int i=0; + for (Iterator it = list.iterator(); it.hasNext();) + { + bindingKeys[i] = new AMQShortString((String)it.next()); + i++; + } + _bindingURL.setBindingKeys(bindingKeys); + } + + } + for (String key: _options.keySet()) + { + // We want to skip the bindingKey list + if (_options.get(key) instanceof String) + { + _bindingURL.setOption(key, (String)_options.get(key)); + } + } + + + // check if both a binding key and a routing key is specified. + if (_options.containsKey(BindingURL.OPTION_BINDING_KEY) && _options.containsKey(BindingURL.OPTION_ROUTING_KEY)) + { + throw new URISyntaxException(String.valueOf(_url),"It is illegal to specify both a routingKey and a bindingKey in the same URL",-1); + } + + // check for durable subscriptions + if (_bindingURL.getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)) + { + String queueName = null; + if (Boolean.parseBoolean(_bindingURL.getOption(BindingURL.OPTION_DURABLE))) + { + if (_bindingURL.containsOption(BindingURL.OPTION_CLIENTID) && _bindingURL.containsOption(BindingURL.OPTION_SUBSCRIPTION)) + { + queueName = _bindingURL.getOption(BindingURL.OPTION_CLIENTID + ":" + BindingURL.OPTION_SUBSCRIPTION); + } + else + { + throw new URISyntaxException(String.valueOf(_url),"Durable subscription must have values for " + BindingURL.OPTION_CLIENTID + + " and " + BindingURL.OPTION_SUBSCRIPTION , -1); + + } + } + _bindingURL.setQueueName(queueName); + } + } + + public static void main(String[] args) + { + String[] urls = new String[] + { + "topic://amq.topic//myTopic?routingkey='stocks.#'", + "topic://amq.topic/message_queue?bindingKey='usa.*'&bindingKey='control',exclusive='true'", + "topic://amq.topic//?bindingKey='usa.*',bindingKey='control',exclusive='true'", + "direct://amq.direct/dummyDest/myQueue?routingKey='abc.*'", + "exchange.Class://exchangeName/Destination/Queue", + "exchangeClass://exchangeName/Destination/?option='value',option2='value2'", + "IBMPerfQueue1?durable='true'", + "exchangeClass://exchangeName/Destination/?bindingKey='key1',bindingKey='key2'" + }; + + try + { + for (String url: urls) + { + System.out.println("URL " + url); + AMQBindingURL bindingURL = new AMQBindingURL(url); + BindingURLParser parser = new BindingURLParser(url,bindingURL); + System.out.println("\nX " + bindingURL.toString() + " \n"); + + } + + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} |
