diff options
| author | Victor Uriarte <victor.m.uriarte@intel.com> | 2016-06-19 00:58:41 -0700 |
|---|---|---|
| committer | Victor Uriarte <victor.m.uriarte@intel.com> | 2016-06-20 07:41:13 -0700 |
| commit | 5d50f349cda37986bb3704e8fe25d57c78e6047a (patch) | |
| tree | 8c2b2061534767316e11f01a77bcf55c4c560889 /tests/test_split.py | |
| parent | c9e8230502fd2c72833a2ea4d2c6bac9234e580a (diff) | |
| download | sqlparse-5d50f349cda37986bb3704e8fe25d57c78e6047a.tar.gz | |
Remove some test classes and clean-up
Diffstat (limited to 'tests/test_split.py')
| -rw-r--r-- | tests/test_split.py | 267 |
1 files changed, 140 insertions, 127 deletions
diff --git a/tests/test_split.py b/tests/test_split.py index 5968806..8a2fe2d 100644 --- a/tests/test_split.py +++ b/tests/test_split.py @@ -10,133 +10,146 @@ import sqlparse from sqlparse.compat import StringIO, text_type -class SQLSplitTest(object): - """Tests sqlparse.sqlsplit().""" - - _sql1 = 'select * from foo;' - _sql2 = 'select * from bar;' - - def test_split_semicolon(self): - sql2 = 'select * from foo where bar = \'foo;bar\';' - stmts = sqlparse.parse(''.join([self._sql1, sql2])) - assert len(stmts) == 2 - assert str(stmts[0]) == self._sql1 - assert str(stmts[1]) == sql2 - - def test_split_backslash(self): - stmts = sqlparse.parse(r"select '\\'; select '\''; select '\\\'';") - assert len(stmts) == 3 - - def test_create_function(self, load_file): - sql = load_file('function.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 1 - assert str(stmts[0]) == sql - - def test_create_function_psql(self, load_file): - sql = load_file('function_psql.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 1 - assert str(stmts[0]) == sql - - def test_create_function_psql3(self, load_file): - sql = load_file('function_psql3.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 1 - assert str(stmts[0]) == sql - - def test_create_function_psql2(self, load_file): - sql = load_file('function_psql2.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 1 - assert str(stmts[0]) == sql - - def test_dashcomments(self, load_file): - sql = load_file('dashcomment.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 3 - assert ''.join(str(q) for q in stmts) == sql - - def test_dashcomments_eol(self): - stmts = sqlparse.parse('select foo; -- comment\n') - assert len(stmts) == 1 - stmts = sqlparse.parse('select foo; -- comment\r') - assert len(stmts) == 1 - stmts = sqlparse.parse('select foo; -- comment\r\n') - assert len(stmts) == 1 - stmts = sqlparse.parse('select foo; -- comment') - assert len(stmts) == 1 - - def test_begintag(self, load_file): - sql = load_file('begintag.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 3 - assert ''.join(str(q) for q in stmts) == sql - - def test_begintag_2(self, load_file): - sql = load_file('begintag_2.sql') - stmts = sqlparse.parse(sql) - assert len(stmts) == 1 - assert ''.join(str(q) for q in stmts) == sql - - def test_dropif(self): - sql = 'DROP TABLE IF EXISTS FOO;\n\nSELECT * FROM BAR;' - stmts = sqlparse.parse(sql) - assert len(stmts) == 2 - assert ''.join(str(q) for q in stmts) == sql - - def test_comment_with_umlaut(self): - sql = (u'select * from foo;\n' - u'-- Testing an umlaut: ä\n' - u'select * from bar;') - stmts = sqlparse.parse(sql) - assert len(stmts) == 2 - assert ''.join(str(q) for q in stmts) == sql - - def test_comment_end_of_line(self): - sql = ('select * from foo; -- foo\n' - 'select * from bar;') - stmts = sqlparse.parse(sql) - assert len(stmts) == 2 - assert ''.join(str(q) for q in stmts) == sql - # make sure the comment belongs to first query - assert str(stmts[0]) == 'select * from foo; -- foo\n' - - def test_casewhen(self): - sql = ('SELECT case when val = 1 then 2 else null end as foo;\n' - 'comment on table actor is \'The actor table.\';') - stmts = sqlparse.split(sql) - assert len(stmts) == 2 - - def test_cursor_declare(self): - sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n' - 'SELECT 2;') - stmts = sqlparse.split(sql) - assert len(stmts) == 2 - - def test_if_function(self): # see issue 33 - # don't let IF as a function confuse the splitter - sql = ('CREATE TEMPORARY TABLE tmp ' - 'SELECT IF(a=1, a, b) AS o FROM one; ' - 'SELECT t FROM two') - stmts = sqlparse.split(sql) - assert len(stmts) == 2 - - def test_split_stream(self): - stream = StringIO("SELECT 1; SELECT 2;") - stmts = sqlparse.parsestream(stream) - assert isinstance(stmts, types.GeneratorType) - assert len(list(stmts)) == 2 - - def test_encoding_parsestream(self): - stream = StringIO("SELECT 1; SELECT 2;") - stmts = list(sqlparse.parsestream(stream)) - assert isinstance(stmts[0].tokens[0].value, text_type) - - def test_unicode_parsestream(self): - stream = StringIO(u"SELECT ö") - stmts = list(sqlparse.parsestream(stream)) - assert str(stmts[0]) == "SELECT ö" +def test_split_semicolon(): + sql1 = 'select * from foo;' + sql2 = "select * from foo where bar = 'foo;bar';" + stmts = sqlparse.parse(''.join([sql1, sql2])) + assert len(stmts) == 2 + assert str(stmts[0]) == sql1 + assert str(stmts[1]) == sql2 + + +def test_split_backslash(): + stmts = sqlparse.parse(r"select '\\'; select '\''; select '\\\'';") + assert len(stmts) == 3 + + +def test_split_create_function(load_file): + sql = load_file('function.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 1 + assert str(stmts[0]) == sql + + +def test_split_create_function_psql(load_file): + sql = load_file('function_psql.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 1 + assert text_type(stmts[0]) == sql + + +def test_split_create_function_psql3(load_file): + sql = load_file('function_psql3.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 1 + assert str(stmts[0]) == sql + + +def test_split_create_function_psql2(load_file): + sql = load_file('function_psql2.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 1 + assert str(stmts[0]) == sql + + +def test_split_dashcomments(load_file): + sql = load_file('dashcomment.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 3 + assert ''.join(str(q) for q in stmts) == sql + + +def test_split_dashcomments_eol(): + stmts = sqlparse.parse('select foo; -- comment\n') + assert len(stmts) == 1 + stmts = sqlparse.parse('select foo; -- comment\r') + assert len(stmts) == 1 + stmts = sqlparse.parse('select foo; -- comment\r\n') + assert len(stmts) == 1 + stmts = sqlparse.parse('select foo; -- comment') + assert len(stmts) == 1 + + +def test_split_begintag(load_file): + sql = load_file('begintag.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 3 + assert ''.join(str(q) for q in stmts) == sql + + +def test_split_begintag_2(load_file): + sql = load_file('begintag_2.sql') + stmts = sqlparse.parse(sql) + assert len(stmts) == 1 + assert ''.join(str(q) for q in stmts) == sql + + +def test_split_dropif(): + sql = 'DROP TABLE IF EXISTS FOO;\n\nSELECT * FROM BAR;' + stmts = sqlparse.parse(sql) + assert len(stmts) == 2 + assert ''.join(str(q) for q in stmts) == sql + + +def test_split_comment_with_umlaut(): + sql = (u'select * from foo;\n' + u'-- Testing an umlaut: ä\n' + u'select * from bar;') + stmts = sqlparse.parse(sql) + assert len(stmts) == 2 + assert ''.join(text_type(q) for q in stmts) == sql + + +def test_split_comment_end_of_line(): + sql = ('select * from foo; -- foo\n' + 'select * from bar;') + stmts = sqlparse.parse(sql) + assert len(stmts) == 2 + assert ''.join(str(q) for q in stmts) == sql + # make sure the comment belongs to first query + assert str(stmts[0]) == 'select * from foo; -- foo\n' + + +def test_split_casewhen(): + sql = ("SELECT case when val = 1 then 2 else null end as foo;\n" + "comment on table actor is 'The actor table.';") + stmts = sqlparse.split(sql) + assert len(stmts) == 2 + + +def test_split_cursor_declare(): + sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n' + 'SELECT 2;') + stmts = sqlparse.split(sql) + assert len(stmts) == 2 + + +def test_split_if_function(): # see issue 33 + # don't let IF as a function confuse the splitter + sql = ('CREATE TEMPORARY TABLE tmp ' + 'SELECT IF(a=1, a, b) AS o FROM one; ' + 'SELECT t FROM two') + stmts = sqlparse.split(sql) + assert len(stmts) == 2 + + +def test_split_stream(): + stream = StringIO("SELECT 1; SELECT 2;") + stmts = sqlparse.parsestream(stream) + assert isinstance(stmts, types.GeneratorType) + assert len(list(stmts)) == 2 + + +def test_split_encoding_parsestream(): + stream = StringIO("SELECT 1; SELECT 2;") + stmts = list(sqlparse.parsestream(stream)) + assert isinstance(stmts[0].tokens[0].value, text_type) + + +def test_split_unicode_parsestream(): + stream = StringIO(u'SELECT ö') + stmts = list(sqlparse.parsestream(stream)) + assert str(stmts[0]) == 'SELECT ö' def test_split_simple(): |
