summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/provision.py
blob: 0feb419873fd2e769a9f7e8cda6ae9d1cd678fa5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
from sqlalchemy.engine import url as sa_url
from sqlalchemy import text
from sqlalchemy import exc
from sqlalchemy.util import compat
from . import config, engines
import os
import time

FOLLOWER_IDENT = None


class register(object):
    def __init__(self):
        self.fns = {}

    @classmethod
    def init(cls, fn):
        return register().for_db("*")(fn)

    def for_db(self, dbname):
        def decorate(fn):
            self.fns[dbname] = fn
            return self
        return decorate

    def __call__(self, cfg, *arg):
        if isinstance(cfg, compat.string_types):
            url = sa_url.make_url(cfg)
        elif isinstance(cfg, sa_url.URL):
            url = cfg
        else:
            url = cfg.db.url
        backend = url.get_backend_name()
        if backend in self.fns:
            return self.fns[backend](cfg, *arg)
        else:
            return self.fns['*'](cfg, *arg)


def create_follower_db(follower_ident):

    for cfg in _configs_for_db_operation():
        _create_db(cfg, cfg.db, follower_ident)


def configure_follower(follower_ident):
    for cfg in config.Config.all_configs():
        _configure_follower(cfg, follower_ident)


def setup_config(db_url, options, file_config, follower_ident):
    if follower_ident:
        db_url = _follower_url_from_main(db_url, follower_ident)
    db_opts = {}
    _update_db_opts(db_url, db_opts)
    eng = engines.testing_engine(db_url, db_opts)
    _post_configure_engine(db_url, eng, follower_ident)
    eng.connect().close()
    cfg = config.Config.register(eng, db_opts, options, file_config)
    if follower_ident:
        _configure_follower(cfg, follower_ident)
    return cfg


def drop_follower_db(follower_ident):
    for cfg in _configs_for_db_operation():
        _drop_db(cfg, cfg.db, follower_ident)


def _configs_for_db_operation():
    hosts = set()

    for cfg in config.Config.all_configs():
        cfg.db.dispose()

    for cfg in config.Config.all_configs():
        url = cfg.db.url
        backend = url.get_backend_name()
        host_conf = (
            backend,
            url.username, url.host, url.database)

        if host_conf not in hosts:
            yield cfg
            hosts.add(host_conf)

    for cfg in config.Config.all_configs():
        cfg.db.dispose()


@register.init
def _create_db(cfg, eng, ident):
    raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)


@register.init
def _drop_db(cfg, eng, ident):
    raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)


@register.init
def _update_db_opts(db_url, db_opts):
    pass


@register.init
def _configure_follower(cfg, ident):
    pass


@register.init
def _post_configure_engine(url, engine, follower_ident):
    pass


@register.init
def _follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    url.database = ident
    return url


@_update_db_opts.for_db("mssql")
def _mssql_update_db_opts(db_url, db_opts):
    db_opts['legacy_schema_aliasing'] = False


@_follower_url_from_main.for_db("sqlite")
def _sqlite_follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    if not url.database or url.database == ':memory:':
        return url
    else:
        return sa_url.make_url("sqlite:///%s.db" % ident)


@_post_configure_engine.for_db("sqlite")
def _sqlite_post_configure_engine(url, engine, follower_ident):
    from sqlalchemy import event

    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        # use file DBs in all cases, memory acts kind of strangely
        # as an attached
        if not follower_ident:
            dbapi_connection.execute(
                'ATTACH DATABASE "test_schema.db" AS test_schema')
        else:
            dbapi_connection.execute(
                'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
                % follower_ident)


@_create_db.for_db("postgresql")
def _pg_create_db(cfg, eng, ident):
    with eng.connect().execution_options(
            isolation_level="AUTOCOMMIT") as conn:
        try:
            _pg_drop_db(cfg, conn, ident)
        except Exception:
            pass
        currentdb = conn.scalar("select current_database()")
        for attempt in range(3):
            try:
                conn.execute(
                    "CREATE DATABASE %s TEMPLATE %s" % (ident, currentdb))
            except exc.OperationalError as err:
                if attempt != 2 and "accessed by other users" in str(err):
                    time.sleep(.2)
                    continue
                else:
                    raise
            else:
                break


@_create_db.for_db("mysql")
def _mysql_create_db(cfg, eng, ident):
    with eng.connect() as conn:
        try:
            _mysql_drop_db(cfg, conn, ident)
        except Exception:
            pass
        conn.execute("CREATE DATABASE %s" % ident)
        conn.execute("CREATE DATABASE %s_test_schema" % ident)
        conn.execute("CREATE DATABASE %s_test_schema_2" % ident)


@_configure_follower.for_db("mysql")
def _mysql_configure_follower(config, ident):
    config.test_schema = "%s_test_schema" % ident
    config.test_schema_2 = "%s_test_schema_2" % ident


@_create_db.for_db("sqlite")
def _sqlite_create_db(cfg, eng, ident):
    pass


@_drop_db.for_db("postgresql")
def _pg_drop_db(cfg, eng, ident):
    with eng.connect().execution_options(
            isolation_level="AUTOCOMMIT") as conn:
        conn.execute(
            text(
                "select pg_terminate_backend(pid) from pg_stat_activity "
                "where usename=current_user and pid != pg_backend_pid() "
                "and datname=:dname"
            ), dname=ident)
        conn.execute("DROP DATABASE %s" % ident)


@_drop_db.for_db("sqlite")
def _sqlite_drop_db(cfg, eng, ident):
    if ident:
        os.remove("%s_test_schema.db" % ident)
    else:
        os.remove("%s.db" % ident)


@_drop_db.for_db("mysql")
def _mysql_drop_db(cfg, eng, ident):
    with eng.connect() as conn:
        try:
            conn.execute("DROP DATABASE %s_test_schema" % ident)
        except Exception:
            pass
        try:
            conn.execute("DROP DATABASE %s_test_schema_2" % ident)
        except Exception:
            pass
        try:
            conn.execute("DROP DATABASE %s" % ident)
        except Exception:
            pass


@_create_db.for_db("oracle")
def _oracle_create_db(cfg, eng, ident):
    # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
    # similar, so that the default tablespace is not "system"; reflection will
    # fail otherwise
    with eng.connect() as conn:
        conn.execute("create user %s identified by xe" % ident)
        conn.execute("create user %s_ts1 identified by xe" % ident)
        conn.execute("create user %s_ts2 identified by xe" % ident)
        conn.execute("grant dba to %s" % (ident, ))
        conn.execute("grant unlimited tablespace to %s" % ident)
        conn.execute("grant unlimited tablespace to %s_ts1" % ident)
        conn.execute("grant unlimited tablespace to %s_ts2" % ident)

@_configure_follower.for_db("oracle")
def _oracle_configure_follower(config, ident):
    config.test_schema = "%s_ts1" % ident
    config.test_schema_2 = "%s_ts2" % ident


@_drop_db.for_db("oracle")
def _oracle_drop_db(cfg, eng, ident):
    with eng.connect() as conn:
        conn.execute("drop user %s cascade" % ident)
        conn.execute("drop user %s_ts1 cascade" % ident)
        conn.execute("drop user %s_ts2 cascade" % ident)


@_follower_url_from_main.for_db("oracle")
def _oracle_follower_url_from_main(url, ident):
    url = sa_url.make_url(url)
    url.username = ident
    url.password = 'xe'
    return url