summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/connectors/pyodbc.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/connectors/pyodbc.py')
-rw-r--r--lib/sqlalchemy/connectors/pyodbc.py77
1 files changed, 39 insertions, 38 deletions
diff --git a/lib/sqlalchemy/connectors/pyodbc.py b/lib/sqlalchemy/connectors/pyodbc.py
index 41ba89de6..8f5eea89b 100644
--- a/lib/sqlalchemy/connectors/pyodbc.py
+++ b/lib/sqlalchemy/connectors/pyodbc.py
@@ -13,7 +13,7 @@ import re
class PyODBCConnector(Connector):
- driver = 'pyodbc'
+ driver = "pyodbc"
supports_sane_rowcount_returning = False
supports_sane_multi_rowcount = False
@@ -22,7 +22,7 @@ class PyODBCConnector(Connector):
supports_unicode_binds = True
supports_native_decimal = True
- default_paramstyle = 'named'
+ default_paramstyle = "named"
# for non-DSN connections, this *may* be used to
# hold the desired driver name
@@ -35,10 +35,10 @@ class PyODBCConnector(Connector):
@classmethod
def dbapi(cls):
- return __import__('pyodbc')
+ return __import__("pyodbc")
def create_connect_args(self, url):
- opts = url.translate_connect_args(username='user')
+ opts = url.translate_connect_args(username="user")
opts.update(url.query)
keys = opts
@@ -46,52 +46,55 @@ class PyODBCConnector(Connector):
query = url.query
connect_args = {}
- for param in ('ansi', 'unicode_results', 'autocommit'):
+ for param in ("ansi", "unicode_results", "autocommit"):
if param in keys:
connect_args[param] = util.asbool(keys.pop(param))
- if 'odbc_connect' in keys:
- connectors = [util.unquote_plus(keys.pop('odbc_connect'))]
+ if "odbc_connect" in keys:
+ connectors = [util.unquote_plus(keys.pop("odbc_connect"))]
else:
+
def check_quote(token):
if ";" in str(token):
token = "'%s'" % token
return token
- keys = dict(
- (k, check_quote(v)) for k, v in keys.items()
- )
+ keys = dict((k, check_quote(v)) for k, v in keys.items())
- dsn_connection = 'dsn' in keys or \
- ('host' in keys and 'database' not in keys)
+ dsn_connection = "dsn" in keys or (
+ "host" in keys and "database" not in keys
+ )
if dsn_connection:
- connectors = ['dsn=%s' % (keys.pop('host', '') or
- keys.pop('dsn', ''))]
+ connectors = [
+ "dsn=%s" % (keys.pop("host", "") or keys.pop("dsn", ""))
+ ]
else:
- port = ''
- if 'port' in keys and 'port' not in query:
- port = ',%d' % int(keys.pop('port'))
+ port = ""
+ if "port" in keys and "port" not in query:
+ port = ",%d" % int(keys.pop("port"))
connectors = []
- driver = keys.pop('driver', self.pyodbc_driver_name)
+ driver = keys.pop("driver", self.pyodbc_driver_name)
if driver is None:
util.warn(
"No driver name specified; "
"this is expected by PyODBC when using "
- "DSN-less connections")
+ "DSN-less connections"
+ )
else:
connectors.append("DRIVER={%s}" % driver)
connectors.extend(
[
- 'Server=%s%s' % (keys.pop('host', ''), port),
- 'Database=%s' % keys.pop('database', '')
- ])
+ "Server=%s%s" % (keys.pop("host", ""), port),
+ "Database=%s" % keys.pop("database", ""),
+ ]
+ )
user = keys.pop("user", None)
if user:
connectors.append("UID=%s" % user)
- connectors.append("PWD=%s" % keys.pop('password', ''))
+ connectors.append("PWD=%s" % keys.pop("password", ""))
else:
connectors.append("Trusted_Connection=Yes")
@@ -99,18 +102,20 @@ class PyODBCConnector(Connector):
# convert textual data from your database encoding to your
# client encoding. This should obviously be set to 'No' if
# you query a cp1253 encoded database from a latin1 client...
- if 'odbc_autotranslate' in keys:
- connectors.append("AutoTranslate=%s" %
- keys.pop("odbc_autotranslate"))
+ if "odbc_autotranslate" in keys:
+ connectors.append(
+ "AutoTranslate=%s" % keys.pop("odbc_autotranslate")
+ )
- connectors.extend(['%s=%s' % (k, v) for k, v in keys.items()])
+ connectors.extend(["%s=%s" % (k, v) for k, v in keys.items()])
return [[";".join(connectors)], connect_args]
def is_disconnect(self, e, connection, cursor):
if isinstance(e, self.dbapi.ProgrammingError):
- return "The cursor's connection has been closed." in str(e) or \
- 'Attempt to use a closed connection.' in str(e)
+ return "The cursor's connection has been closed." in str(
+ e
+ ) or "Attempt to use a closed connection." in str(e)
else:
return False
@@ -123,10 +128,7 @@ class PyODBCConnector(Connector):
return self._parse_dbapi_version(self.dbapi.version)
def _parse_dbapi_version(self, vers):
- m = re.match(
- r'(?:py.*-)?([\d\.]+)(?:-(\w+))?',
- vers
- )
+ m = re.match(r"(?:py.*-)?([\d\.]+)(?:-(\w+))?", vers)
if not m:
return ()
vers = tuple([int(x) for x in m.group(1).split(".")])
@@ -140,7 +142,7 @@ class PyODBCConnector(Connector):
# queries.
dbapi_con = connection.connection
version = []
- r = re.compile(r'[.\-]')
+ r = re.compile(r"[.\-]")
for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)):
try:
version.append(int(n))
@@ -153,12 +155,11 @@ class PyODBCConnector(Connector):
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
# to work properly
- if hasattr(connection, 'connection'):
+ if hasattr(connection, "connection"):
connection = connection.connection
- if level == 'AUTOCOMMIT':
+ if level == "AUTOCOMMIT":
connection.autocommit = True
else:
connection.autocommit = False
- super(PyODBCConnector, self).set_isolation_level(connection,
- level)
+ super(PyODBCConnector, self).set_isolation_level(connection, level)