diff options
Diffstat (limited to 'test/sql/test_operators.py')
-rw-r--r-- | test/sql/test_operators.py | 198 |
1 files changed, 196 insertions, 2 deletions
diff --git a/test/sql/test_operators.py b/test/sql/test_operators.py index bb4cb1bf1..fbbdd7b62 100644 --- a/test/sql/test_operators.py +++ b/test/sql/test_operators.py @@ -12,8 +12,9 @@ from sqlalchemy import exc from sqlalchemy.engine import default from sqlalchemy.sql.elements import _literal_as_text from sqlalchemy.schema import Column, Table, MetaData +from sqlalchemy.sql import compiler from sqlalchemy.types import TypeEngine, TypeDecorator, UserDefinedType, \ - Boolean, NullType, MatchType + Boolean, NullType, MatchType, Indexable from sqlalchemy.dialects import mysql, firebird, postgresql, oracle, \ sqlite, mssql from sqlalchemy import util @@ -21,7 +22,6 @@ import datetime import collections from sqlalchemy import text, literal_column from sqlalchemy import and_, not_, between, or_ -from sqlalchemy.sql import true, false, null class LoopOperate(operators.ColumnOperators): @@ -577,6 +577,200 @@ class ExtensionOperatorTest(fixtures.TestBase, testing.AssertsCompiledSQL): ) +class IndexableTest(fixtures.TestBase, testing.AssertsCompiledSQL): + def setUp(self): + class MyTypeCompiler(compiler.GenericTypeCompiler): + def visit_mytype(self, type, **kw): + return "MYTYPE" + + def visit_myothertype(self, type, **kw): + return "MYOTHERTYPE" + + class MyCompiler(compiler.SQLCompiler): + def visit_slice(self, element, **kw): + return "%s:%s" % ( + self.process(element.start, **kw), + self.process(element.stop, **kw), + ) + + def visit_getitem_binary(self, binary, operator, **kw): + return "%s[%s]" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw) + ) + + class MyDialect(default.DefaultDialect): + statement_compiler = MyCompiler + type_compiler = MyTypeCompiler + + class MyType(Indexable, TypeEngine): + __visit_name__ = 'mytype' + + def __init__(self, zero_indexes=False, dimensions=1): + if zero_indexes: + self.zero_indexes = zero_indexes + self.dimensions = dimensions + + class Comparator(Indexable.Comparator): + def _setup_getitem(self, index): + if isinstance(index, slice): + return_type = self.type + elif self.type.dimensions is None or \ + self.type.dimensions == 1: + return_type = Integer() + else: + adapt_kw = {'dimensions': self.type.dimensions - 1} + # this is also testing the behavior of adapt() + # that we can pass kw that override constructor kws. + # required a small change to util.constructor_copy(). + return_type = self.type.adapt( + self.type.__class__, **adapt_kw) + + return operators.getitem, index, return_type + comparator_factory = Comparator + + self.MyType = MyType + self.__dialect__ = MyDialect() + + def test_setup_getitem_w_dims(self): + """test the behavior of the _setup_getitem() method given a simple + 'dimensions' scheme - this is identical to postgresql.ARRAY.""" + + col = Column('x', self.MyType(dimensions=3)) + + is_( + col[5].type._type_affinity, self.MyType + ) + eq_( + col[5].type.dimensions, 2 + ) + is_( + col[5][6].type._type_affinity, self.MyType + ) + eq_( + col[5][6].type.dimensions, 1 + ) + is_( + col[5][6][7].type._type_affinity, Integer + ) + + def test_getindex_literal(self): + + col = Column('x', self.MyType()) + + self.assert_compile( + col[5], + "x[:x_1]", + checkparams={'x_1': 5} + ) + + def test_getindex_sqlexpr(self): + + col = Column('x', self.MyType()) + col2 = Column('y', Integer()) + + self.assert_compile( + col[col2], + "x[y]", + checkparams={} + ) + + self.assert_compile( + col[col2 + 8], + "x[(y + :y_1)]", + checkparams={'y_1': 8} + ) + + def test_getslice_literal(self): + + col = Column('x', self.MyType()) + + self.assert_compile( + col[5:6], + "x[:x_1::x_2]", + checkparams={'x_1': 5, 'x_2': 6} + ) + + def test_getslice_sqlexpr(self): + + col = Column('x', self.MyType()) + col2 = Column('y', Integer()) + + self.assert_compile( + col[col2:col2 + 5], + "x[y:y + :y_1]", + checkparams={'y_1': 5} + ) + + def test_getindex_literal_zeroind(self): + + col = Column('x', self.MyType(zero_indexes=True)) + + self.assert_compile( + col[5], + "x[:x_1]", + checkparams={'x_1': 6} + ) + + def test_getindex_sqlexpr_zeroind(self): + + col = Column('x', self.MyType(zero_indexes=True)) + col2 = Column('y', Integer()) + + self.assert_compile( + col[col2], + "x[(y + :y_1)]", + checkparams={'y_1': 1} + ) + + self.assert_compile( + col[col2 + 8], + "x[(y + :y_1 + :param_1)]", + checkparams={'y_1': 8, 'param_1': 1} + ) + + def test_getslice_literal_zeroind(self): + + col = Column('x', self.MyType(zero_indexes=True)) + + self.assert_compile( + col[5:6], + "x[:x_1::x_2]", + checkparams={'x_1': 6, 'x_2': 7} + ) + + def test_getslice_sqlexpr_zeroind(self): + + col = Column('x', self.MyType(zero_indexes=True)) + col2 = Column('y', Integer()) + + self.assert_compile( + col[col2:col2 + 5], + "x[y + :y_1:y + :y_2 + :param_1]", + checkparams={'y_1': 1, 'y_2': 5, 'param_1': 1} + ) + + def test_override_operators(self): + special_index_op = operators.custom_op('->') + + class MyOtherType(Indexable, TypeEngine): + __visit_name__ = 'myothertype' + + class Comparator(TypeEngine.Comparator): + + def _adapt_expression(self, op, other_comparator): + return special_index_op, MyOtherType() + + comparator_factory = Comparator + + col = Column('x', MyOtherType()) + self.assert_compile( + col[5], + "x -> :x_1", + checkparams={'x_1': 5} + ) + + class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): """test standalone booleans being wrapped in an AsBoolean, as well |