summaryrefslogtreecommitdiff
path: root/coverage/sqldata.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2022-12-29 13:57:52 -0500
committerNed Batchelder <ned@nedbatchelder.com>2022-12-29 13:57:52 -0500
commitcde67d3a710b20fbe25a1e22aeaa1a0ed552ae6f (patch)
treebe61ea6ddd70a3b8bbcce62a661bccf1dbc7f616 /coverage/sqldata.py
parent124b3758c0da8c8fa9f11bfd93700cdcf52a789d (diff)
downloadpython-coveragepy-git-cde67d3a710b20fbe25a1e22aeaa1a0ed552ae6f.tar.gz
mypy: check sqldata.py
Diffstat (limited to 'coverage/sqldata.py')
-rw-r--r--coverage/sqldata.py210
1 files changed, 122 insertions, 88 deletions
diff --git a/coverage/sqldata.py b/coverage/sqldata.py
index 4caa13d2..2a42e122 100644
--- a/coverage/sqldata.py
+++ b/coverage/sqldata.py
@@ -3,6 +3,8 @@
"""SQLite coverage data."""
+from __future__ import annotations
+
import collections
import contextlib
import datetime
@@ -19,16 +21,22 @@ import textwrap
import threading
import zlib
+from typing import (
+ cast, Any, Callable, Dict, Generator, Iterable, List, Optional,
+ Sequence, Set, Tuple, TypeVar, Union,
+)
+
from coverage.debug import NoDebugging, AutoReprMixin, clipped_repr
from coverage.exceptions import CoverageException, DataError
from coverage.files import PathAliases
-from coverage.misc import contract, file_be_gone, isolate_module
+from coverage.misc import file_be_gone, isolate_module
from coverage.numbits import numbits_to_nums, numbits_union, nums_to_numbits
+from coverage.types import TArc, TDebugCtl, TLineNo, TWarnFn
from coverage.version import __version__
os = isolate_module(os)
-# If you change the schema, increment the SCHEMA_VERSION, and update the
+# If you change the schema: increment the SCHEMA_VERSION and update the
# docs in docs/dbschema.rst by running "make cogdoc".
SCHEMA_VERSION = 7
@@ -104,6 +112,21 @@ CREATE TABLE tracer (
);
"""
+TMethod = TypeVar("TMethod", bound=Callable[..., Any])
+
+def _locked(method: TMethod) -> TMethod:
+ """A decorator for methods that should hold self._lock."""
+ @functools.wraps(method)
+ def _wrapped(self: CoverageData, *args: Any, **kwargs: Any) -> Any:
+ if self._debug.should("lock"):
+ self._debug.write(f"Locking {self._lock!r} for {method.__name__}")
+ with self._lock:
+ if self._debug.should("lock"):
+ self._debug.write(f"Locked {self._lock!r} for {method.__name__}")
+ return method(self, *args, **kwargs)
+ return _wrapped # type: ignore[return-value]
+
+
class CoverageData(AutoReprMixin):
"""Manages collected coverage data, including file storage.
@@ -187,7 +210,14 @@ class CoverageData(AutoReprMixin):
"""
- def __init__(self, basename=None, suffix=None, no_disk=False, warn=None, debug=None):
+ def __init__(
+ self,
+ basename: Optional[str]=None,
+ suffix: Optional[Union[str, bool]]=None,
+ no_disk: bool=False,
+ warn: Optional[TWarnFn]=None,
+ debug: Optional[TDebugCtl]=None,
+ ) -> None:
"""Create a :class:`CoverageData` object to hold coverage-measured data.
Arguments:
@@ -209,9 +239,10 @@ class CoverageData(AutoReprMixin):
self._debug = debug or NoDebugging()
self._choose_filename()
- self._file_map = {}
+ # Maps filenames to row ids.
+ self._file_map: Dict[str, int] = {}
# Maps thread ids to SqliteDb objects.
- self._dbs = {}
+ self._dbs: Dict[int, SqliteDb] = {}
self._pid = os.getpid()
# Synchronize the operations used during collection.
self._lock = threading.RLock()
@@ -222,24 +253,11 @@ class CoverageData(AutoReprMixin):
self._has_lines = False
self._has_arcs = False
- self._current_context = None
- self._current_context_id = None
- self._query_context_ids = None
+ self._current_context: Optional[str] = None
+ self._current_context_id: Optional[int] = None
+ self._query_context_ids: Optional[List[int]] = None
- def _locked(method): # pylint: disable=no-self-argument
- """A decorator for methods that should hold self._lock."""
- @functools.wraps(method)
- def _wrapped(self, *args, **kwargs):
- if self._debug.should("lock"):
- self._debug.write(f"Locking {self._lock!r} for {method.__name__}")
- with self._lock:
- if self._debug.should("lock"):
- self._debug.write(f"Locked {self._lock!r} for {method.__name__}")
- # pylint: disable=not-callable
- return method(self, *args, **kwargs)
- return _wrapped
-
- def _choose_filename(self):
+ def _choose_filename(self) -> None:
"""Set self._filename based on inited attributes."""
if self._no_disk:
self._filename = ":memory:"
@@ -249,7 +267,7 @@ class CoverageData(AutoReprMixin):
if suffix:
self._filename += "." + suffix
- def _reset(self):
+ def _reset(self) -> None:
"""Reset our attributes."""
if not self._no_disk:
for db in self._dbs.values():
@@ -259,18 +277,18 @@ class CoverageData(AutoReprMixin):
self._have_used = False
self._current_context_id = None
- def _open_db(self):
+ def _open_db(self) -> None:
"""Open an existing db file, and read its metadata."""
if self._debug.should("dataio"):
self._debug.write(f"Opening data file {self._filename!r}")
self._dbs[threading.get_ident()] = SqliteDb(self._filename, self._debug)
self._read_db()
- def _read_db(self):
+ def _read_db(self) -> None:
"""Read the metadata from a database so that we are ready to use it."""
with self._dbs[threading.get_ident()] as db:
try:
- schema_version, = db.execute_one("select version from coverage_schema")
+ row = db.execute_one("select version from coverage_schema")
except Exception as exc:
if "no such table: coverage_schema" in str(exc):
self._init_db(db)
@@ -281,6 +299,10 @@ class CoverageData(AutoReprMixin):
)
) from exc
else:
+ if row is None:
+ schema_version = None
+ else:
+ schema_version = row[0]
if schema_version != SCHEMA_VERSION:
raise DataError(
"Couldn't use data file {!r}: wrong schema: {} instead of {}".format(
@@ -288,16 +310,16 @@ class CoverageData(AutoReprMixin):
)
)
- with db.execute("select value from meta where key = 'has_arcs'") as cur:
- for row in cur:
- self._has_arcs = bool(int(row[0]))
- self._has_lines = not self._has_arcs
+ row = db.execute_one("select value from meta where key = 'has_arcs'")
+ if row is not None:
+ self._has_arcs = bool(int(row[0]))
+ self._has_lines = not self._has_arcs
with db.execute("select id, path from file") as cur:
for file_id, path in cur:
self._file_map[path] = file_id
- def _init_db(self, db):
+ def _init_db(self, db: SqliteDb) -> None:
"""Write the initial contents of the database."""
if self._debug.should("dataio"):
self._debug.write(f"Initing data file {self._filename!r}")
@@ -316,13 +338,13 @@ class CoverageData(AutoReprMixin):
])
db.executemany_void("insert or ignore into meta (key, value) values (?, ?)", meta_data)
- def _connect(self):
+ def _connect(self) -> SqliteDb:
"""Get the SqliteDb object to use."""
if threading.get_ident() not in self._dbs:
self._open_db()
return self._dbs[threading.get_ident()]
- def __bool__(self):
+ def __bool__(self) -> bool:
if (threading.get_ident() not in self._dbs and not os.path.exists(self._filename)):
return False
try:
@@ -332,8 +354,7 @@ class CoverageData(AutoReprMixin):
except CoverageException:
return False
- @contract(returns="bytes")
- def dumps(self):
+ def dumps(self) -> bytes:
"""Serialize the current data to a byte string.
The format of the serialized data is not documented. It is only
@@ -356,8 +377,7 @@ class CoverageData(AutoReprMixin):
script = con.dump()
return b"z" + zlib.compress(script.encode("utf-8"))
- @contract(data="bytes")
- def loads(self, data):
+ def loads(self, data: bytes) -> None:
"""Deserialize data from :meth:`dumps`.
Use with a newly-created empty :class:`CoverageData` object. It's
@@ -385,7 +405,7 @@ class CoverageData(AutoReprMixin):
self._read_db()
self._have_used = True
- def _file_id(self, filename, add=False):
+ def _file_id(self, filename: str, add: bool=False) -> Optional[int]:
"""Get the file id for `filename`.
If filename is not in the database yet, add it if `add` is True.
@@ -400,19 +420,19 @@ class CoverageData(AutoReprMixin):
)
return self._file_map.get(filename)
- def _context_id(self, context):
+ def _context_id(self, context: str) -> Optional[int]:
"""Get the id for a context."""
assert context is not None
self._start_using()
with self._connect() as con:
row = con.execute_one("select id from context where context = ?", (context,))
if row is not None:
- return row[0]
+ return cast(int, row[0])
else:
return None
@_locked
- def set_context(self, context):
+ def set_context(self, context: str) -> None:
"""Set the current context for future :meth:`add_lines` etc.
`context` is a str, the name of the context to use for the next data
@@ -426,7 +446,7 @@ class CoverageData(AutoReprMixin):
self._current_context = context
self._current_context_id = None
- def _set_context_id(self):
+ def _set_context_id(self) -> None:
"""Use the _current_context to set _current_context_id."""
context = self._current_context or ""
context_id = self._context_id(context)
@@ -439,7 +459,7 @@ class CoverageData(AutoReprMixin):
(context,)
)
- def base_filename(self):
+ def base_filename(self) -> str:
"""The base filename for storing data.
.. versionadded:: 5.0
@@ -447,7 +467,7 @@ class CoverageData(AutoReprMixin):
"""
return self._basename
- def data_filename(self):
+ def data_filename(self) -> str:
"""Where is the data stored?
.. versionadded:: 5.0
@@ -456,7 +476,7 @@ class CoverageData(AutoReprMixin):
return self._filename
@_locked
- def add_lines(self, line_data):
+ def add_lines(self, line_data: Dict[str, Sequence[TLineNo]]) -> None:
"""Add measured line data.
`line_data` is a dictionary mapping file names to iterables of ints::
@@ -466,7 +486,7 @@ class CoverageData(AutoReprMixin):
"""
if self._debug.should("dataop"):
self._debug.write("Adding lines: %d files, %d lines total" % (
- len(line_data), sum(len(lines) for lines in line_data.values())
+ len(line_data), sum(bool(len(lines)) for lines in line_data.values())
))
self._start_using()
self._choose_lines_or_arcs(lines=True)
@@ -490,7 +510,7 @@ class CoverageData(AutoReprMixin):
)
@_locked
- def add_arcs(self, arc_data):
+ def add_arcs(self, arc_data: Dict[str, Set[TArc]]) -> None:
"""Add measured arc data.
`arc_data` is a dictionary mapping file names to iterables of pairs of
@@ -518,7 +538,7 @@ class CoverageData(AutoReprMixin):
data,
)
- def _choose_lines_or_arcs(self, lines=False, arcs=False):
+ def _choose_lines_or_arcs(self, lines: bool=False, arcs: bool=False) -> None:
"""Force the data file to choose between lines and arcs."""
assert lines or arcs
assert not (lines and arcs)
@@ -540,7 +560,7 @@ class CoverageData(AutoReprMixin):
)
@_locked
- def add_file_tracers(self, file_tracers):
+ def add_file_tracers(self, file_tracers: Dict[str, str]) -> None:
"""Add per-file plugin information.
`file_tracers` is { filename: plugin_name, ... }
@@ -573,7 +593,7 @@ class CoverageData(AutoReprMixin):
(file_id, plugin_name)
)
- def touch_file(self, filename, plugin_name=""):
+ def touch_file(self, filename: str, plugin_name: str="") -> None:
"""Ensure that `filename` appears in the data, empty if needed.
`plugin_name` is the name of the plugin responsible for this file. It is used
@@ -581,7 +601,7 @@ class CoverageData(AutoReprMixin):
"""
self.touch_files([filename], plugin_name)
- def touch_files(self, filenames, plugin_name=""):
+ def touch_files(self, filenames: Iterable[str], plugin_name: str="") -> None:
"""Ensure that `filenames` appear in the data, empty if needed.
`plugin_name` is the name of the plugin responsible for these files. It is used
@@ -600,7 +620,7 @@ class CoverageData(AutoReprMixin):
# Set the tracer for this file
self.add_file_tracers({filename: plugin_name})
- def update(self, other_data, aliases=None):
+ def update(self, other_data: CoverageData, aliases: Optional[PathAliases]=None) -> None:
"""Update this data with data from several other :class:`CoverageData` instances.
If `aliases` is provided, it's a `PathAliases` object that is used to
@@ -652,7 +672,7 @@ class CoverageData(AutoReprMixin):
"inner join file on file.id = line_bits.file_id " +
"inner join context on context.id = line_bits.context_id"
) as cur:
- lines = {}
+ lines: Dict[Tuple[str, str], bytes] = {}
for path, context, numbits in cur:
key = (files[path], context)
if key in lines:
@@ -668,6 +688,7 @@ class CoverageData(AutoReprMixin):
tracers = {files[path]: tracer for (path, tracer) in cur}
with self._connect() as con:
+ assert con.con is not None
con.con.isolation_level = "IMMEDIATE"
# Get all tracers in the DB. Files not in the tracers are assumed
@@ -768,7 +789,7 @@ class CoverageData(AutoReprMixin):
self._reset()
self.read()
- def erase(self, parallel=False):
+ def erase(self, parallel: bool=False) -> None:
"""Erase the data in this object.
If `parallel` is true, then also deletes data files created from the
@@ -790,17 +811,17 @@ class CoverageData(AutoReprMixin):
self._debug.write(f"Erasing parallel data file {filename!r}")
file_be_gone(filename)
- def read(self):
+ def read(self) -> None:
"""Start using an existing data file."""
if os.path.exists(self._filename):
with self._connect():
self._have_used = True
- def write(self):
+ def write(self) -> None:
"""Ensure the data is written to the data file."""
pass
- def _start_using(self):
+ def _start_using(self) -> None:
"""Call this before using the database at all."""
if self._pid != os.getpid():
# Looks like we forked! Have to start a new data file.
@@ -811,15 +832,15 @@ class CoverageData(AutoReprMixin):
self.erase()
self._have_used = True
- def has_arcs(self):
+ def has_arcs(self) -> bool:
"""Does the database have arcs (True) or lines (False)."""
return bool(self._has_arcs)
- def measured_files(self):
+ def measured_files(self) -> Set[str]:
"""A set of all files that had been measured."""
return set(self._file_map)
- def measured_contexts(self):
+ def measured_contexts(self) -> Set[str]:
"""A set of all contexts that have been measured.
.. versionadded:: 5.0
@@ -831,7 +852,7 @@ class CoverageData(AutoReprMixin):
contexts = {row[0] for row in cur}
return contexts
- def file_tracer(self, filename):
+ def file_tracer(self, filename: str) -> Optional[str]:
"""Get the plugin name of the file tracer for a file.
Returns the name of the plugin that handles this file. If the file was
@@ -849,7 +870,7 @@ class CoverageData(AutoReprMixin):
return row[0] or ""
return "" # File was measured, but no tracer associated.
- def set_query_context(self, context):
+ def set_query_context(self, context: str) -> None:
"""Set a context for subsequent querying.
The next :meth:`lines`, :meth:`arcs`, or :meth:`contexts_by_lineno`
@@ -865,7 +886,7 @@ class CoverageData(AutoReprMixin):
with con.execute("select id from context where context = ?", (context,)) as cur:
self._query_context_ids = [row[0] for row in cur.fetchall()]
- def set_query_contexts(self, contexts):
+ def set_query_contexts(self, contexts: Sequence[str]) -> None:
"""Set a number of contexts for subsequent querying.
The next :meth:`lines`, :meth:`arcs`, or :meth:`contexts_by_lineno`
@@ -886,7 +907,7 @@ class CoverageData(AutoReprMixin):
else:
self._query_context_ids = None
- def lines(self, filename):
+ def lines(self, filename: str) -> Optional[List[TLineNo]]:
"""Get the list of lines executed for a source file.
If the file was not measured, returns None. A file might be measured,
@@ -921,7 +942,7 @@ class CoverageData(AutoReprMixin):
nums.update(numbits_to_nums(row[0]))
return list(nums)
- def arcs(self, filename):
+ def arcs(self, filename: str) -> Optional[List[TArc]]:
"""Get the list of arcs executed for a file.
If the file was not measured, returns None. A file might be measured,
@@ -953,7 +974,7 @@ class CoverageData(AutoReprMixin):
with con.execute(query, data) as cur:
return list(cur)
- def contexts_by_lineno(self, filename):
+ def contexts_by_lineno(self, filename: str) -> Dict[TLineNo, List[str]]:
"""Get the contexts for each line in a file.
Returns:
@@ -1005,7 +1026,7 @@ class CoverageData(AutoReprMixin):
return {lineno: list(contexts) for lineno, contexts in lineno_contexts_map.items()}
@classmethod
- def sys_info(cls):
+ def sys_info(cls) -> List[Tuple[str, Any]]:
"""Our information for `Coverage.sys_info`.
Returns a list of (key, value) pairs.
@@ -1025,7 +1046,7 @@ class CoverageData(AutoReprMixin):
]
-def filename_suffix(suffix):
+def filename_suffix(suffix: Union[str, bool, None]) -> Union[str, None]:
"""Compute a filename suffix for a data file.
If `suffix` is a string or None, simply return it. If `suffix` is True,
@@ -1042,6 +1063,8 @@ def filename_suffix(suffix):
# if the process forks.
dice = random.Random(os.urandom(8)).randint(0, 999999)
suffix = "%s.%s.%06d" % (socket.gethostname(), os.getpid(), dice)
+ elif suffix is False:
+ suffix = None
return suffix
@@ -1055,13 +1078,13 @@ class SqliteDb(AutoReprMixin):
db.execute("insert into schema (version) values (?)", (SCHEMA_VERSION,))
"""
- def __init__(self, filename, debug):
+ def __init__(self, filename: str, debug: TDebugCtl) -> None:
self.debug = debug
self.filename = filename
self.nest = 0
- self.con = None
+ self.con: Optional[sqlite3.Connection] = None
- def _connect(self):
+ def _connect(self) -> None:
"""Connect to the db and do universal initialization."""
if self.con is not None:
return
@@ -1087,23 +1110,25 @@ class SqliteDb(AutoReprMixin):
# This pragma makes writing faster.
self.execute_void("pragma synchronous=off")
- def close(self):
+ def close(self) -> None:
"""If needed, close the connection."""
if self.con is not None and self.filename != ":memory:":
self.con.close()
self.con = None
- def __enter__(self):
+ def __enter__(self) -> SqliteDb:
if self.nest == 0:
self._connect()
+ assert self.con is not None
self.con.__enter__()
self.nest += 1
return self
- def __exit__(self, exc_type, exc_value, traceback):
+ def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore[no-untyped-def]
self.nest -= 1
if self.nest == 0:
try:
+ assert self.con is not None
self.con.__exit__(exc_type, exc_value, traceback)
self.close()
except Exception as exc:
@@ -1111,19 +1136,20 @@ class SqliteDb(AutoReprMixin):
self.debug.write(f"EXCEPTION from __exit__: {exc}")
raise DataError(f"Couldn't end data file {self.filename!r}: {exc}") from exc
- def _execute(self, sql, parameters):
+ def _execute(self, sql: str, parameters: Iterable[Any]) -> sqlite3.Cursor:
"""Same as :meth:`python:sqlite3.Connection.execute`."""
if self.debug.should("sql"):
tail = f" with {parameters!r}" if parameters else ""
self.debug.write(f"Executing {sql!r}{tail}")
try:
+ assert self.con is not None
try:
- return self.con.execute(sql, parameters)
+ return self.con.execute(sql, parameters) # type: ignore[arg-type]
except Exception:
# In some cases, an error might happen that isn't really an
# error. Try again immediately.
# https://github.com/nedbat/coveragepy/issues/1010
- return self.con.execute(sql, parameters)
+ return self.con.execute(sql, parameters) # type: ignore[arg-type]
except sqlite3.Error as exc:
msg = str(exc)
try:
@@ -1143,7 +1169,11 @@ class SqliteDb(AutoReprMixin):
raise DataError(f"Couldn't use data file {self.filename!r}: {msg}") from exc
@contextlib.contextmanager
- def execute(self, sql, parameters=()):
+ def execute(
+ self,
+ sql: str,
+ parameters: Iterable[Any]=(),
+ ) -> Generator[sqlite3.Cursor, None, None]:
"""Context managed :meth:`python:sqlite3.Connection.execute`.
Use with a ``with`` statement to auto-close the returned cursor.
@@ -1154,19 +1184,20 @@ class SqliteDb(AutoReprMixin):
finally:
cur.close()
- def execute_void(self, sql, parameters=()):
+ def execute_void(self, sql: str, parameters: Iterable[Any]=()) -> None:
"""Same as :meth:`python:sqlite3.Connection.execute` when you don't need the cursor."""
self._execute(sql, parameters).close()
- def execute_for_rowid(self, sql, parameters=()):
+ def execute_for_rowid(self, sql: str, parameters: Iterable[Any]=()) -> int:
"""Like execute, but returns the lastrowid."""
with self.execute(sql, parameters) as cur:
- rowid = cur.lastrowid
+ assert cur.lastrowid is not None
+ rowid: int = cur.lastrowid
if self.debug.should("sqldata"):
self.debug.write(f"Row id result: {rowid!r}")
return rowid
- def execute_one(self, sql, parameters=()):
+ def execute_one(self, sql: str, parameters: Iterable[Any]=()) -> Optional[Tuple[Any, ...]]:
"""Execute a statement and return the one row that results.
This is like execute(sql, parameters).fetchone(), except it is
@@ -1180,11 +1211,11 @@ class SqliteDb(AutoReprMixin):
if len(rows) == 0:
return None
elif len(rows) == 1:
- return rows[0]
+ return cast(Tuple[Any, ...], rows[0])
else:
raise AssertionError(f"SQL {sql!r} shouldn't return {len(rows)} rows")
- def _executemany(self, sql, data):
+ def _executemany(self, sql: str, data: Iterable[Any]) -> sqlite3.Cursor:
"""Same as :meth:`python:sqlite3.Connection.executemany`."""
if self.debug.should("sql"):
data = list(data)
@@ -1193,6 +1224,7 @@ class SqliteDb(AutoReprMixin):
if self.debug.should("sqldata"):
for i, row in enumerate(data):
self.debug.write(f"{i:4d}: {row!r}")
+ assert self.con is not None
try:
return self.con.executemany(sql, data)
except Exception: # pragma: cant happen
@@ -1202,7 +1234,7 @@ class SqliteDb(AutoReprMixin):
return self.con.executemany(sql, data)
@contextlib.contextmanager
- def executemany(self, sql, data):
+ def executemany(self, sql: str, data: Iterable[Any]) -> Generator[sqlite3.Cursor, None, None]:
"""Context managed :meth:`python:sqlite3.Connection.executemany`.
Use with a ``with`` statement to auto-close the returned cursor.
@@ -1213,18 +1245,20 @@ class SqliteDb(AutoReprMixin):
finally:
cur.close()
- def executemany_void(self, sql, data):
+ def executemany_void(self, sql: str, data: Iterable[Any]) -> None:
"""Same as :meth:`python:sqlite3.Connection.executemany` when you don't need the cursor."""
self._executemany(sql, data).close()
- def executescript(self, script):
+ def executescript(self, script: str) -> None:
"""Same as :meth:`python:sqlite3.Connection.executescript`."""
if self.debug.should("sql"):
self.debug.write("Executing script with {} chars: {}".format(
len(script), clipped_repr(script, 100),
))
+ assert self.con is not None
self.con.executescript(script).close()
- def dump(self):
+ def dump(self) -> str:
"""Return a multi-line string, the SQL dump of the database."""
+ assert self.con is not None
return "\n".join(self.con.iterdump())