1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
from sqlalchemy.test.testing import eq_
from sqlalchemy import *
from sqlalchemy.databases import firebird
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.sql import table, column
from sqlalchemy.test import *
class DomainReflectionTest(TestBase, AssertsExecutionResults):
"Test Firebird domains"
__only_on__ = 'firebird'
@classmethod
def setup_class(cls):
con = testing.db.connect()
try:
con.execute('CREATE DOMAIN int_domain AS INTEGER DEFAULT 42 NOT NULL')
con.execute('CREATE DOMAIN str_domain AS VARCHAR(255)')
con.execute('CREATE DOMAIN rem_domain AS BLOB SUB_TYPE TEXT')
con.execute('CREATE DOMAIN img_domain AS BLOB SUB_TYPE BINARY')
except ProgrammingError, e:
if not "attempt to store duplicate value" in str(e):
raise e
con.execute('''CREATE GENERATOR gen_testtable_id''')
con.execute('''CREATE TABLE testtable (question int_domain,
answer str_domain DEFAULT 'no answer',
remark rem_domain DEFAULT '',
photo img_domain,
d date,
t time,
dt timestamp)''')
con.execute('''ALTER TABLE testtable
ADD CONSTRAINT testtable_pk PRIMARY KEY (question)''')
con.execute('''CREATE TRIGGER testtable_autoid FOR testtable
ACTIVE BEFORE INSERT AS
BEGIN
IF (NEW.question IS NULL) THEN
NEW.question = gen_id(gen_testtable_id, 1);
END''')
@classmethod
def teardown_class(cls):
con = testing.db.connect()
con.execute('DROP TABLE testtable')
con.execute('DROP DOMAIN int_domain')
con.execute('DROP DOMAIN str_domain')
con.execute('DROP DOMAIN rem_domain')
con.execute('DROP DOMAIN img_domain')
con.execute('DROP GENERATOR gen_testtable_id')
def test_table_is_reflected(self):
metadata = MetaData(testing.db)
table = Table('testtable', metadata, autoload=True)
eq_(set(table.columns.keys()),
set(['question', 'answer', 'remark', 'photo', 'd', 't', 'dt']),
"Columns of reflected table didn't equal expected columns")
eq_(table.c.question.primary_key, True)
eq_(table.c.question.sequence.name, 'gen_testtable_id')
eq_(table.c.question.type.__class__, firebird.FBInteger)
eq_(table.c.question.server_default.arg.text, "42")
eq_(table.c.answer.type.__class__, firebird.FBString)
eq_(table.c.answer.server_default.arg.text, "'no answer'")
eq_(table.c.remark.type.__class__, firebird.FBText)
eq_(table.c.remark.server_default.arg.text, "''")
eq_(table.c.photo.type.__class__, firebird.FBBinary)
# The following assume a Dialect 3 database
eq_(table.c.d.type.__class__, firebird.FBDate)
eq_(table.c.t.type.__class__, firebird.FBTime)
eq_(table.c.dt.type.__class__, firebird.FBDateTime)
class CompileTest(TestBase, AssertsCompiledSQL):
__dialect__ = firebird.FBDialect()
def test_alias(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t.alias()])
self.assert_compile(s, "SELECT sometable_1.col1, sometable_1.col2 FROM sometable sometable_1")
def test_function(self):
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo")
m = MetaData()
t = Table('sometable', m, Column('col1', Integer), Column('col2', Integer))
self.assert_compile(select([func.max(t.c.col1)]), "SELECT max(sometable.col1) AS max_1 FROM sometable")
def test_substring(self):
self.assert_compile(func.substring('abc', 1, 2), "SUBSTRING(:substring_1 FROM :substring_2 FOR :substring_3)")
self.assert_compile(func.substring('abc', 1), "SUBSTRING(:substring_1 FROM :substring_2)")
def test_update_returning(self):
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
u = update(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name])
self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING mytable.myid, mytable.name")
u = update(table1, values=dict(name='foo'), firebird_returning=[table1])
self.assert_compile(u, "UPDATE mytable SET name=:name "\
"RETURNING mytable.myid, mytable.name, mytable.description")
u = update(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)])
self.assert_compile(u, "UPDATE mytable SET name=:name RETURNING char_length(mytable.name)")
def test_insert_returning(self):
table1 = table('mytable',
column('myid', Integer),
column('name', String(128)),
column('description', String(128)),
)
i = insert(table1, values=dict(name='foo'), firebird_returning=[table1.c.myid, table1.c.name])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING mytable.myid, mytable.name")
i = insert(table1, values=dict(name='foo'), firebird_returning=[table1])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) "\
"RETURNING mytable.myid, mytable.name, mytable.description")
i = insert(table1, values=dict(name='foo'), firebird_returning=[func.length(table1.c.name)])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (:name) RETURNING char_length(mytable.name)")
class ReturningTest(TestBase, AssertsExecutionResults):
__only_on__ = 'firebird'
@testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
def test_update_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
)
table.create()
try:
table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
result = table.update(table.c.persons > 4, dict(full=True), firebird_returning=[table.c.id]).execute()
eq_(result.fetchall(), [(1,)])
result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
eq_(result2.fetchall(), [(1,True),(2,False)])
finally:
table.drop()
@testing.exclude('firebird', '<', (2, 0), '2.0+ feature')
def test_insert_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
)
table.create()
try:
result = table.insert(firebird_returning=[table.c.id]).execute({'persons': 1, 'full': False})
eq_(result.fetchall(), [(1,)])
# Multiple inserts only return the last row
result2 = table.insert(firebird_returning=[table]).execute(
[{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
eq_(result2.fetchall(), [(3,3,True)])
result3 = table.insert(firebird_returning=[table.c.id]).execute({'persons': 4, 'full': False})
eq_([dict(row) for row in result3], [{'ID':4}])
result4 = testing.db.execute('insert into tables (id, persons, "full") values (5, 10, 1) returning persons')
eq_([dict(row) for row in result4], [{'PERSONS': 10}])
finally:
table.drop()
@testing.exclude('firebird', '<', (2, 1), '2.1+ feature')
def test_delete_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
Column('id', Integer, Sequence('gen_tables_id'), primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
)
table.create()
try:
table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
result = table.delete(table.c.persons > 4, firebird_returning=[table.c.id]).execute()
eq_(result.fetchall(), [(1,)])
result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
eq_(result2.fetchall(), [(2,False),])
finally:
table.drop()
class MiscFBTests(TestBase):
__only_on__ = 'firebird'
def test_strlen(self):
# On FB the length() function is implemented by an external
# UDF, strlen(). Various SA tests fail because they pass a
# parameter to it, and that does not work (it always results
# the maximum string length the UDF was declared to accept).
# This test checks that at least it works ok in other cases.
meta = MetaData(testing.db)
t = Table('t1', meta,
Column('id', Integer, Sequence('t1idseq'), primary_key=True),
Column('name', String(10))
)
meta.create_all()
try:
t.insert(values=dict(name='dante')).execute()
t.insert(values=dict(name='alighieri')).execute()
select([func.count(t.c.id)],func.length(t.c.name)==5).execute().fetchone()[0] == 1
finally:
meta.drop_all()
def test_server_version_info(self):
version = testing.db.dialect.server_version_info(testing.db.connect())
assert len(version) == 3, "Got strange version info: %s" % repr(version)
|