| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
 | # mysql/mysqlconnector.py
# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""
.. dialect:: mysql+mysqlconnector
    :name: MySQL Connector/Python
    :dbapi: myconnpy
    :connectstring: mysql+mysqlconnector://<user>:<password>@\
<host>[:<port>]/<dbname>
    :url: http://dev.mysql.com/downloads/connector/python/
Unicode
-------
Please see :ref:`mysql_unicode` for current recommendations on unicode
handling.
"""
from .base import (MySQLDialect, MySQLExecutionContext,
                   MySQLCompiler, MySQLIdentifierPreparer,
                   BIT)
from ... import util
import re
class MySQLExecutionContext_mysqlconnector(MySQLExecutionContext):
    def get_lastrowid(self):
        return self.cursor.lastrowid
class MySQLCompiler_mysqlconnector(MySQLCompiler):
    def visit_mod_binary(self, binary, operator, **kw):
        if self.dialect._mysqlconnector_double_percents:
            return self.process(binary.left, **kw) + " %% " + \
                self.process(binary.right, **kw)
        else:
            return self.process(binary.left, **kw) + " % " + \
                self.process(binary.right, **kw)
    def post_process_text(self, text):
        if self.dialect._mysqlconnector_double_percents:
            return text.replace('%', '%%')
        else:
            return text
    def escape_literal_column(self, text):
        if self.dialect._mysqlconnector_double_percents:
            return text.replace('%', '%%')
        else:
            return text
class MySQLIdentifierPreparer_mysqlconnector(MySQLIdentifierPreparer):
    def _escape_identifier(self, value):
        value = value.replace(self.escape_quote, self.escape_to_quote)
        if self.dialect._mysqlconnector_double_percents:
            return value.replace("%", "%%")
        else:
            return value
class _myconnpyBIT(BIT):
    def result_processor(self, dialect, coltype):
        """MySQL-connector already converts mysql bits, so."""
        return None
class MySQLDialect_mysqlconnector(MySQLDialect):
    driver = 'mysqlconnector'
    supports_unicode_binds = True
    supports_sane_rowcount = True
    supports_sane_multi_rowcount = True
    supports_native_decimal = True
    default_paramstyle = 'format'
    execution_ctx_cls = MySQLExecutionContext_mysqlconnector
    statement_compiler = MySQLCompiler_mysqlconnector
    preparer = MySQLIdentifierPreparer_mysqlconnector
    colspecs = util.update_copy(
        MySQLDialect.colspecs,
        {
            BIT: _myconnpyBIT,
        }
    )
    @util.memoized_property
    def supports_unicode_statements(self):
        return util.py3k or self._mysqlconnector_version_info > (2, 0)
    @classmethod
    def dbapi(cls):
        from mysql import connector
        return connector
    def create_connect_args(self, url):
        opts = url.translate_connect_args(username='user')
        opts.update(url.query)
        util.coerce_kw_type(opts, 'allow_local_infile', bool)
        util.coerce_kw_type(opts, 'autocommit', bool)
        util.coerce_kw_type(opts, 'buffered', bool)
        util.coerce_kw_type(opts, 'compress', bool)
        util.coerce_kw_type(opts, 'connection_timeout', int)
        util.coerce_kw_type(opts, 'connect_timeout', int)
        util.coerce_kw_type(opts, 'consume_results', bool)
        util.coerce_kw_type(opts, 'force_ipv6', bool)
        util.coerce_kw_type(opts, 'get_warnings', bool)
        util.coerce_kw_type(opts, 'pool_reset_session', bool)
        util.coerce_kw_type(opts, 'pool_size', int)
        util.coerce_kw_type(opts, 'raise_on_warnings', bool)
        util.coerce_kw_type(opts, 'raw', bool)
        util.coerce_kw_type(opts, 'ssl_verify_cert', bool)
        util.coerce_kw_type(opts, 'use_pure', bool)
        util.coerce_kw_type(opts, 'use_unicode', bool)
        # unfortunately, MySQL/connector python refuses to release a
        # cursor without reading fully, so non-buffered isn't an option
        opts.setdefault('buffered', True)
        # FOUND_ROWS must be set in ClientFlag to enable
        # supports_sane_rowcount.
        if self.dbapi is not None:
            try:
                from mysql.connector.constants import ClientFlag
                client_flags = opts.get(
                    'client_flags', ClientFlag.get_default())
                client_flags |= ClientFlag.FOUND_ROWS
                opts['client_flags'] = client_flags
            except Exception:
                pass
        return [[], opts]
    @util.memoized_property
    def _mysqlconnector_version_info(self):
        if self.dbapi and hasattr(self.dbapi, '__version__'):
            m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?',
                         self.dbapi.__version__)
            if m:
                return tuple(
                    int(x)
                    for x in m.group(1, 2, 3)
                    if x is not None)
    @util.memoized_property
    def _mysqlconnector_double_percents(self):
        return not util.py3k and self._mysqlconnector_version_info < (2, 0)
    def _get_server_version_info(self, connection):
        dbapi_con = connection.connection
        version = dbapi_con.get_server_version()
        return tuple(version)
    def _detect_charset(self, connection):
        return connection.connection.charset
    def _extract_error_code(self, exception):
        return exception.errno
    def is_disconnect(self, e, connection, cursor):
        errnos = (2006, 2013, 2014, 2045, 2055, 2048)
        exceptions = (self.dbapi.OperationalError, self.dbapi.InterfaceError)
        if isinstance(e, exceptions):
            return e.errno in errnos or \
                "MySQL Connection not available." in str(e)
        else:
            return False
    def _compat_fetchall(self, rp, charset=None):
        return rp.fetchall()
    def _compat_fetchone(self, rp, charset=None):
        return rp.fetchone()
    _isolation_lookup = set(['SERIALIZABLE', 'READ UNCOMMITTED',
                             'READ COMMITTED', 'REPEATABLE READ',
                             'AUTOCOMMIT'])
    def _set_isolation_level(self, connection, level):
        if level == 'AUTOCOMMIT':
            connection.autocommit = True
        else:
            connection.autocommit = False
            super(MySQLDialect_mysqlconnector, self)._set_isolation_level(
                connection, level)
dialect = MySQLDialect_mysqlconnector
 |