diff options
Diffstat (limited to 'python/qpid/tests/messaging/address.py')
-rw-r--r-- | python/qpid/tests/messaging/address.py | 321 |
1 files changed, 0 insertions, 321 deletions
diff --git a/python/qpid/tests/messaging/address.py b/python/qpid/tests/messaging/address.py deleted file mode 100644 index aa9562a717..0000000000 --- a/python/qpid/tests/messaging/address.py +++ /dev/null @@ -1,321 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - - -from qpid.tests import Test -from qpid.messaging.address import lex, parse, ParseError, EOF, ID, NUMBER, \ - SYM, WSPACE, LEXER -from qpid.lexer import Token -from qpid.harness import Skipped -from qpid.tests.parser import ParserBase - -def indent(st): - return " " + st.replace("\n", "\n ") - -def pprint_address(name, subject, options): - return "NAME: %s\nSUBJECT: %s\nOPTIONS: %s" % \ - (pprint(name), pprint(subject), pprint(options)) - -def pprint(o): - if isinstance(o, dict): - return pprint_map(o) - elif isinstance(o, list): - return pprint_list(o) - elif isinstance(o, basestring): - return pprint_string(o) - else: - return repr(o) - -def pprint_map(m): - items = ["%s: %s" % (pprint(k), pprint(v)) for k, v in m.items()] - items.sort() - return pprint_items("{", items, "}") - -def pprint_list(l): - return pprint_items("[", [pprint(x) for x in l], "]") - -def pprint_items(start, items, end): - if items: - return "%s\n%s\n%s" % (start, ",\n".join([indent(i) for i in items]), end) - else: - return "%s%s" % (start, end) - -def pprint_string(s): - result = "'" - for c in s: - if c == "'": - result += "\\'" - elif c == "\n": - result += "\\n" - elif ord(c) >= 0x80: - result += "\\u%04x" % ord(c) - else: - result += c - result += "'" - return result - -class AddressTests(ParserBase, Test): - - EXCLUDE = (WSPACE, EOF) - - def fields(self, line, n): - result = line.split(":", n - 1) - result.extend([None]*(n - len(result))) - return result - - def call(self, parser, mode, input): - try: - from subprocess import Popen, PIPE, STDOUT - po = Popen([parser, mode], stdin=PIPE, stdout=PIPE, stderr=STDOUT) - except ImportError, e: - raise Skipped("%s" % e) - except OSError, e: - raise Skipped("%s: %s" % (e, parser)) - out, _ = po.communicate(input=input) - return out - - def parser(self): - return self.config.defines.get("address.parser") - - def do_lex(self, st): - parser = self.parser() - if parser: - out = self.call(parser, "lex", st) - lines = out.split("\n") - toks = [] - for line in lines: - if line.strip(): - name, position, value = self.fields(line, 3) - toks.append(Token(LEXER.type(name), value, position, st)) - return toks - else: - return lex(st) - - def do_parse(self, st): - return parse(st) - - def valid(self, addr, name=None, subject=None, options=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "%s\n" % pprint_address(name, subject, options) - assert expected == got, "expected\n<EXP>%s</EXP>\ngot\n<GOT>%s</GOT>" % (expected, got) - else: - ParserBase.valid(self, addr, (name, subject, options)) - - def invalid(self, addr, error=None): - parser = self.parser() - if parser: - got = self.call(parser, "parse", addr) - expected = "ERROR: %s\n" % error - assert expected == got, "expected %r, got %r" % (expected, got) - else: - ParserBase.invalid(self, addr, error) - - def testDashInId1(self): - self.lex("foo-bar", ID) - - def testDashInId2(self): - self.lex("foo-3", ID) - - def testDashAlone1(self): - self.lex("foo - bar", ID, SYM, ID) - - def testDashAlone2(self): - self.lex("foo - 3", ID, SYM, NUMBER) - - def testLeadingDash(self): - self.lex("-foo", SYM, ID) - - def testTrailingDash(self): - self.lex("foo-", ID, SYM) - - def testNegativeNum(self): - self.lex("-3", NUMBER) - - def testIdNum(self): - self.lex("id1", ID) - - def testIdSpaceNum(self): - self.lex("id 1", ID, NUMBER) - - def testHash(self): - self.valid("foo/bar.#", "foo", "bar.#") - - def testStar(self): - self.valid("foo/bar.*", "foo", "bar.*") - - def testColon(self): - self.valid("foo.bar/baz.qux:moo:arf", "foo.bar", "baz.qux:moo:arf") - - def testOptions(self): - self.valid("foo.bar/baz.qux:moo:arf; {key: value}", - "foo.bar", "baz.qux:moo:arf", {"key": "value"}) - - def testOptionsTrailingComma(self): - self.valid("name/subject; {key: value,}", "name", "subject", - {"key": "value"}) - - def testOptionsNone(self): - self.valid("name/subject; {key: None}", "name", "subject", - {"key": None}) - - def testSemiSubject(self): - self.valid("foo.bar/'baz.qux;moo:arf'; {key: value}", - "foo.bar", "baz.qux;moo:arf", {"key": "value"}) - - def testCommaSubject(self): - self.valid("foo.bar/baz.qux.{moo,arf}", "foo.bar", "baz.qux.{moo,arf}") - - def testCommaSubjectOptions(self): - self.valid("foo.bar/baz.qux.{moo,arf}; {key: value}", "foo.bar", - "baz.qux.{moo,arf}", {"key": "value"}) - - def testUnbalanced(self): - self.valid("foo.bar/baz.qux.{moo,arf; {key: value}", "foo.bar", - "baz.qux.{moo,arf", {"key": "value"}) - - def testSlashQuote(self): - self.valid("foo.bar\\/baz.qux.{moo,arf; {key: value}", - "foo.bar/baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc1(self): - self.valid("foo.bar\\x00baz.qux.{moo,arf; {key: value}", - "foo.bar\x00baz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc2(self): - self.valid("foo.bar\\xffbaz.qux.{moo,arf; {key: value}", - "foo.bar\xffbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashHexEsc3(self): - self.valid("foo.bar\\xFFbaz.qux.{moo,arf; {key: value}", - "foo.bar\xFFbaz.qux.{moo,arf", - None, {"key": "value"}) - - def testSlashUnicode1(self): - self.valid("foo.bar\\u1234baz.qux.{moo,arf; {key: value}", - u"foo.bar\u1234baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode2(self): - self.valid("foo.bar\\u0000baz.qux.{moo,arf; {key: value}", - u"foo.bar\u0000baz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode3(self): - self.valid("foo.bar\\uffffbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uffffbaz.qux.{moo,arf", None, {"key": "value"}) - - def testSlashUnicode4(self): - self.valid("foo.bar\\uFFFFbaz.qux.{moo,arf; {key: value}", - u"foo.bar\uFFFFbaz.qux.{moo,arf", None, {"key": "value"}) - - def testNoName(self): - self.invalid("; {key: value}", - "unexpected token SEMI(;) line:1,0:; {key: value}") - - def testEmpty(self): - self.invalid("", "unexpected token EOF line:1,0:") - - def testNoNameSlash(self): - self.invalid("/asdf; {key: value}", - "unexpected token SLASH(/) line:1,0:/asdf; {key: value}") - - def testBadOptions1(self): - self.invalid("name/subject; {", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), " - "got EOF line:1,15:name/subject; {") - - def testBadOptions2(self): - self.invalid("name/subject; { 3", - "expecting COLON, got EOF " - "line:1,17:name/subject; { 3") - - def testBadOptions3(self): - self.invalid("name/subject; { key:", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), got EOF " - "line:1,20:name/subject; { key:") - - def testBadOptions4(self): - self.invalid("name/subject; { key: value", - "expecting (COMMA, RBRACE), got EOF " - "line:1,26:name/subject; { key: value") - - def testBadOptions5(self): - self.invalid("name/subject; { key: value asdf", - "expecting (COMMA, RBRACE), got ID(asdf) " - "line:1,27:name/subject; { key: value asdf") - - def testBadOptions6(self): - self.invalid("name/subject; { key: value,", - "expecting (NUMBER, STRING, ID, LBRACE, LBRACK, RBRACE), got EOF " - "line:1,27:name/subject; { key: value,") - - def testBadOptions7(self): - self.invalid("name/subject; { key: value } asdf", - "expecting EOF, got ID(asdf) " - "line:1,29:name/subject; { key: value } asdf") - - def testList1(self): - self.valid("name/subject; { key: [] }", "name", "subject", {"key": []}) - - def testList2(self): - self.valid("name/subject; { key: ['one'] }", "name", "subject", {"key": ['one']}) - - def testList3(self): - self.valid("name/subject; { key: [1, 2, 3] }", "name", "subject", - {"key": [1, 2, 3]}) - - def testList4(self): - self.valid("name/subject; { key: [1, [2, 3], 4] }", "name", "subject", - {"key": [1, [2, 3], 4]}) - - def testBadList1(self): - self.invalid("name/subject; { key: [ }", "expecting (NUMBER, STRING, ID, LBRACE, LBRACK), " - "got RBRACE(}) line:1,23:name/subject; { key: [ }") - - def testBadList2(self): - self.invalid("name/subject; { key: [ 1 }", "expecting (COMMA, RBRACK), " - "got RBRACE(}) line:1,25:name/subject; { key: [ 1 }") - - def testBadList3(self): - self.invalid("name/subject; { key: [ 1 2 }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 }") - - def testBadList4(self): - self.invalid("name/subject; { key: [ 1 2 ] }", "expecting (COMMA, RBRACK), " - "got NUMBER(2) line:1,25:name/subject; { key: [ 1 2 ] }") - - def testMap1(self): - self.valid("name/subject; { 'key': value }", - "name", "subject", {"key": "value"}) - - def testMap2(self): - self.valid("name/subject; { 1: value }", "name", "subject", {1: "value"}) - - def testMap3(self): - self.valid('name/subject; { "foo.bar": value }', - "name", "subject", {"foo.bar": "value"}) - - def testBoolean(self): - self.valid("name/subject; { true1: True, true2: true, " - "false1: False, false2: false }", - "name", "subject", {"true1": True, "true2": True, - "false1": False, "false2": False}) |