summaryrefslogtreecommitdiff
path: root/tests/test_split.py
diff options
context:
space:
mode:
authorVictor Uriarte <victor.m.uriarte@intel.com>2016-06-19 00:58:41 -0700
committerVictor Uriarte <victor.m.uriarte@intel.com>2016-06-20 07:41:13 -0700
commit5d50f349cda37986bb3704e8fe25d57c78e6047a (patch)
tree8c2b2061534767316e11f01a77bcf55c4c560889 /tests/test_split.py
parentc9e8230502fd2c72833a2ea4d2c6bac9234e580a (diff)
downloadsqlparse-5d50f349cda37986bb3704e8fe25d57c78e6047a.tar.gz
Remove some test classes and clean-up
Diffstat (limited to 'tests/test_split.py')
-rw-r--r--tests/test_split.py267
1 files changed, 140 insertions, 127 deletions
diff --git a/tests/test_split.py b/tests/test_split.py
index 5968806..8a2fe2d 100644
--- a/tests/test_split.py
+++ b/tests/test_split.py
@@ -10,133 +10,146 @@ import sqlparse
from sqlparse.compat import StringIO, text_type
-class SQLSplitTest(object):
- """Tests sqlparse.sqlsplit()."""
-
- _sql1 = 'select * from foo;'
- _sql2 = 'select * from bar;'
-
- def test_split_semicolon(self):
- sql2 = 'select * from foo where bar = \'foo;bar\';'
- stmts = sqlparse.parse(''.join([self._sql1, sql2]))
- assert len(stmts) == 2
- assert str(stmts[0]) == self._sql1
- assert str(stmts[1]) == sql2
-
- def test_split_backslash(self):
- stmts = sqlparse.parse(r"select '\\'; select '\''; select '\\\'';")
- assert len(stmts) == 3
-
- def test_create_function(self, load_file):
- sql = load_file('function.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 1
- assert str(stmts[0]) == sql
-
- def test_create_function_psql(self, load_file):
- sql = load_file('function_psql.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 1
- assert str(stmts[0]) == sql
-
- def test_create_function_psql3(self, load_file):
- sql = load_file('function_psql3.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 1
- assert str(stmts[0]) == sql
-
- def test_create_function_psql2(self, load_file):
- sql = load_file('function_psql2.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 1
- assert str(stmts[0]) == sql
-
- def test_dashcomments(self, load_file):
- sql = load_file('dashcomment.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 3
- assert ''.join(str(q) for q in stmts) == sql
-
- def test_dashcomments_eol(self):
- stmts = sqlparse.parse('select foo; -- comment\n')
- assert len(stmts) == 1
- stmts = sqlparse.parse('select foo; -- comment\r')
- assert len(stmts) == 1
- stmts = sqlparse.parse('select foo; -- comment\r\n')
- assert len(stmts) == 1
- stmts = sqlparse.parse('select foo; -- comment')
- assert len(stmts) == 1
-
- def test_begintag(self, load_file):
- sql = load_file('begintag.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 3
- assert ''.join(str(q) for q in stmts) == sql
-
- def test_begintag_2(self, load_file):
- sql = load_file('begintag_2.sql')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 1
- assert ''.join(str(q) for q in stmts) == sql
-
- def test_dropif(self):
- sql = 'DROP TABLE IF EXISTS FOO;\n\nSELECT * FROM BAR;'
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 2
- assert ''.join(str(q) for q in stmts) == sql
-
- def test_comment_with_umlaut(self):
- sql = (u'select * from foo;\n'
- u'-- Testing an umlaut: ä\n'
- u'select * from bar;')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 2
- assert ''.join(str(q) for q in stmts) == sql
-
- def test_comment_end_of_line(self):
- sql = ('select * from foo; -- foo\n'
- 'select * from bar;')
- stmts = sqlparse.parse(sql)
- assert len(stmts) == 2
- assert ''.join(str(q) for q in stmts) == sql
- # make sure the comment belongs to first query
- assert str(stmts[0]) == 'select * from foo; -- foo\n'
-
- def test_casewhen(self):
- sql = ('SELECT case when val = 1 then 2 else null end as foo;\n'
- 'comment on table actor is \'The actor table.\';')
- stmts = sqlparse.split(sql)
- assert len(stmts) == 2
-
- def test_cursor_declare(self):
- sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n'
- 'SELECT 2;')
- stmts = sqlparse.split(sql)
- assert len(stmts) == 2
-
- def test_if_function(self): # see issue 33
- # don't let IF as a function confuse the splitter
- sql = ('CREATE TEMPORARY TABLE tmp '
- 'SELECT IF(a=1, a, b) AS o FROM one; '
- 'SELECT t FROM two')
- stmts = sqlparse.split(sql)
- assert len(stmts) == 2
-
- def test_split_stream(self):
- stream = StringIO("SELECT 1; SELECT 2;")
- stmts = sqlparse.parsestream(stream)
- assert isinstance(stmts, types.GeneratorType)
- assert len(list(stmts)) == 2
-
- def test_encoding_parsestream(self):
- stream = StringIO("SELECT 1; SELECT 2;")
- stmts = list(sqlparse.parsestream(stream))
- assert isinstance(stmts[0].tokens[0].value, text_type)
-
- def test_unicode_parsestream(self):
- stream = StringIO(u"SELECT ö")
- stmts = list(sqlparse.parsestream(stream))
- assert str(stmts[0]) == "SELECT ö"
+def test_split_semicolon():
+ sql1 = 'select * from foo;'
+ sql2 = "select * from foo where bar = 'foo;bar';"
+ stmts = sqlparse.parse(''.join([sql1, sql2]))
+ assert len(stmts) == 2
+ assert str(stmts[0]) == sql1
+ assert str(stmts[1]) == sql2
+
+
+def test_split_backslash():
+ stmts = sqlparse.parse(r"select '\\'; select '\''; select '\\\'';")
+ assert len(stmts) == 3
+
+
+def test_split_create_function(load_file):
+ sql = load_file('function.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 1
+ assert str(stmts[0]) == sql
+
+
+def test_split_create_function_psql(load_file):
+ sql = load_file('function_psql.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 1
+ assert text_type(stmts[0]) == sql
+
+
+def test_split_create_function_psql3(load_file):
+ sql = load_file('function_psql3.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 1
+ assert str(stmts[0]) == sql
+
+
+def test_split_create_function_psql2(load_file):
+ sql = load_file('function_psql2.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 1
+ assert str(stmts[0]) == sql
+
+
+def test_split_dashcomments(load_file):
+ sql = load_file('dashcomment.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 3
+ assert ''.join(str(q) for q in stmts) == sql
+
+
+def test_split_dashcomments_eol():
+ stmts = sqlparse.parse('select foo; -- comment\n')
+ assert len(stmts) == 1
+ stmts = sqlparse.parse('select foo; -- comment\r')
+ assert len(stmts) == 1
+ stmts = sqlparse.parse('select foo; -- comment\r\n')
+ assert len(stmts) == 1
+ stmts = sqlparse.parse('select foo; -- comment')
+ assert len(stmts) == 1
+
+
+def test_split_begintag(load_file):
+ sql = load_file('begintag.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 3
+ assert ''.join(str(q) for q in stmts) == sql
+
+
+def test_split_begintag_2(load_file):
+ sql = load_file('begintag_2.sql')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 1
+ assert ''.join(str(q) for q in stmts) == sql
+
+
+def test_split_dropif():
+ sql = 'DROP TABLE IF EXISTS FOO;\n\nSELECT * FROM BAR;'
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 2
+ assert ''.join(str(q) for q in stmts) == sql
+
+
+def test_split_comment_with_umlaut():
+ sql = (u'select * from foo;\n'
+ u'-- Testing an umlaut: ä\n'
+ u'select * from bar;')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 2
+ assert ''.join(text_type(q) for q in stmts) == sql
+
+
+def test_split_comment_end_of_line():
+ sql = ('select * from foo; -- foo\n'
+ 'select * from bar;')
+ stmts = sqlparse.parse(sql)
+ assert len(stmts) == 2
+ assert ''.join(str(q) for q in stmts) == sql
+ # make sure the comment belongs to first query
+ assert str(stmts[0]) == 'select * from foo; -- foo\n'
+
+
+def test_split_casewhen():
+ sql = ("SELECT case when val = 1 then 2 else null end as foo;\n"
+ "comment on table actor is 'The actor table.';")
+ stmts = sqlparse.split(sql)
+ assert len(stmts) == 2
+
+
+def test_split_cursor_declare():
+ sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n'
+ 'SELECT 2;')
+ stmts = sqlparse.split(sql)
+ assert len(stmts) == 2
+
+
+def test_split_if_function(): # see issue 33
+ # don't let IF as a function confuse the splitter
+ sql = ('CREATE TEMPORARY TABLE tmp '
+ 'SELECT IF(a=1, a, b) AS o FROM one; '
+ 'SELECT t FROM two')
+ stmts = sqlparse.split(sql)
+ assert len(stmts) == 2
+
+
+def test_split_stream():
+ stream = StringIO("SELECT 1; SELECT 2;")
+ stmts = sqlparse.parsestream(stream)
+ assert isinstance(stmts, types.GeneratorType)
+ assert len(list(stmts)) == 2
+
+
+def test_split_encoding_parsestream():
+ stream = StringIO("SELECT 1; SELECT 2;")
+ stmts = list(sqlparse.parsestream(stream))
+ assert isinstance(stmts[0].tokens[0].value, text_type)
+
+
+def test_split_unicode_parsestream():
+ stream = StringIO(u'SELECT ö')
+ stmts = list(sqlparse.parsestream(stream))
+ assert str(stmts[0]) == 'SELECT ö'
def test_split_simple():