diff options
Diffstat (limited to 'coverage/sqldata.py')
-rw-r--r-- | coverage/sqldata.py | 210 |
1 files changed, 122 insertions, 88 deletions
diff --git a/coverage/sqldata.py b/coverage/sqldata.py index 4caa13d2..2a42e122 100644 --- a/coverage/sqldata.py +++ b/coverage/sqldata.py @@ -3,6 +3,8 @@ """SQLite coverage data.""" +from __future__ import annotations + import collections import contextlib import datetime @@ -19,16 +21,22 @@ import textwrap import threading import zlib +from typing import ( + cast, Any, Callable, Dict, Generator, Iterable, List, Optional, + Sequence, Set, Tuple, TypeVar, Union, +) + from coverage.debug import NoDebugging, AutoReprMixin, clipped_repr from coverage.exceptions import CoverageException, DataError from coverage.files import PathAliases -from coverage.misc import contract, file_be_gone, isolate_module +from coverage.misc import file_be_gone, isolate_module from coverage.numbits import numbits_to_nums, numbits_union, nums_to_numbits +from coverage.types import TArc, TDebugCtl, TLineNo, TWarnFn from coverage.version import __version__ os = isolate_module(os) -# If you change the schema, increment the SCHEMA_VERSION, and update the +# If you change the schema: increment the SCHEMA_VERSION and update the # docs in docs/dbschema.rst by running "make cogdoc". SCHEMA_VERSION = 7 @@ -104,6 +112,21 @@ CREATE TABLE tracer ( ); """ +TMethod = TypeVar("TMethod", bound=Callable[..., Any]) + +def _locked(method: TMethod) -> TMethod: + """A decorator for methods that should hold self._lock.""" + @functools.wraps(method) + def _wrapped(self: CoverageData, *args: Any, **kwargs: Any) -> Any: + if self._debug.should("lock"): + self._debug.write(f"Locking {self._lock!r} for {method.__name__}") + with self._lock: + if self._debug.should("lock"): + self._debug.write(f"Locked {self._lock!r} for {method.__name__}") + return method(self, *args, **kwargs) + return _wrapped # type: ignore[return-value] + + class CoverageData(AutoReprMixin): """Manages collected coverage data, including file storage. @@ -187,7 +210,14 @@ class CoverageData(AutoReprMixin): """ - def __init__(self, basename=None, suffix=None, no_disk=False, warn=None, debug=None): + def __init__( + self, + basename: Optional[str]=None, + suffix: Optional[Union[str, bool]]=None, + no_disk: bool=False, + warn: Optional[TWarnFn]=None, + debug: Optional[TDebugCtl]=None, + ) -> None: """Create a :class:`CoverageData` object to hold coverage-measured data. Arguments: @@ -209,9 +239,10 @@ class CoverageData(AutoReprMixin): self._debug = debug or NoDebugging() self._choose_filename() - self._file_map = {} + # Maps filenames to row ids. + self._file_map: Dict[str, int] = {} # Maps thread ids to SqliteDb objects. - self._dbs = {} + self._dbs: Dict[int, SqliteDb] = {} self._pid = os.getpid() # Synchronize the operations used during collection. self._lock = threading.RLock() @@ -222,24 +253,11 @@ class CoverageData(AutoReprMixin): self._has_lines = False self._has_arcs = False - self._current_context = None - self._current_context_id = None - self._query_context_ids = None + self._current_context: Optional[str] = None + self._current_context_id: Optional[int] = None + self._query_context_ids: Optional[List[int]] = None - def _locked(method): # pylint: disable=no-self-argument - """A decorator for methods that should hold self._lock.""" - @functools.wraps(method) - def _wrapped(self, *args, **kwargs): - if self._debug.should("lock"): - self._debug.write(f"Locking {self._lock!r} for {method.__name__}") - with self._lock: - if self._debug.should("lock"): - self._debug.write(f"Locked {self._lock!r} for {method.__name__}") - # pylint: disable=not-callable - return method(self, *args, **kwargs) - return _wrapped - - def _choose_filename(self): + def _choose_filename(self) -> None: """Set self._filename based on inited attributes.""" if self._no_disk: self._filename = ":memory:" @@ -249,7 +267,7 @@ class CoverageData(AutoReprMixin): if suffix: self._filename += "." + suffix - def _reset(self): + def _reset(self) -> None: """Reset our attributes.""" if not self._no_disk: for db in self._dbs.values(): @@ -259,18 +277,18 @@ class CoverageData(AutoReprMixin): self._have_used = False self._current_context_id = None - def _open_db(self): + def _open_db(self) -> None: """Open an existing db file, and read its metadata.""" if self._debug.should("dataio"): self._debug.write(f"Opening data file {self._filename!r}") self._dbs[threading.get_ident()] = SqliteDb(self._filename, self._debug) self._read_db() - def _read_db(self): + def _read_db(self) -> None: """Read the metadata from a database so that we are ready to use it.""" with self._dbs[threading.get_ident()] as db: try: - schema_version, = db.execute_one("select version from coverage_schema") + row = db.execute_one("select version from coverage_schema") except Exception as exc: if "no such table: coverage_schema" in str(exc): self._init_db(db) @@ -281,6 +299,10 @@ class CoverageData(AutoReprMixin): ) ) from exc else: + if row is None: + schema_version = None + else: + schema_version = row[0] if schema_version != SCHEMA_VERSION: raise DataError( "Couldn't use data file {!r}: wrong schema: {} instead of {}".format( @@ -288,16 +310,16 @@ class CoverageData(AutoReprMixin): ) ) - with db.execute("select value from meta where key = 'has_arcs'") as cur: - for row in cur: - self._has_arcs = bool(int(row[0])) - self._has_lines = not self._has_arcs + row = db.execute_one("select value from meta where key = 'has_arcs'") + if row is not None: + self._has_arcs = bool(int(row[0])) + self._has_lines = not self._has_arcs with db.execute("select id, path from file") as cur: for file_id, path in cur: self._file_map[path] = file_id - def _init_db(self, db): + def _init_db(self, db: SqliteDb) -> None: """Write the initial contents of the database.""" if self._debug.should("dataio"): self._debug.write(f"Initing data file {self._filename!r}") @@ -316,13 +338,13 @@ class CoverageData(AutoReprMixin): ]) db.executemany_void("insert or ignore into meta (key, value) values (?, ?)", meta_data) - def _connect(self): + def _connect(self) -> SqliteDb: """Get the SqliteDb object to use.""" if threading.get_ident() not in self._dbs: self._open_db() return self._dbs[threading.get_ident()] - def __bool__(self): + def __bool__(self) -> bool: if (threading.get_ident() not in self._dbs and not os.path.exists(self._filename)): return False try: @@ -332,8 +354,7 @@ class CoverageData(AutoReprMixin): except CoverageException: return False - @contract(returns="bytes") - def dumps(self): + def dumps(self) -> bytes: """Serialize the current data to a byte string. The format of the serialized data is not documented. It is only @@ -356,8 +377,7 @@ class CoverageData(AutoReprMixin): script = con.dump() return b"z" + zlib.compress(script.encode("utf-8")) - @contract(data="bytes") - def loads(self, data): + def loads(self, data: bytes) -> None: """Deserialize data from :meth:`dumps`. Use with a newly-created empty :class:`CoverageData` object. It's @@ -385,7 +405,7 @@ class CoverageData(AutoReprMixin): self._read_db() self._have_used = True - def _file_id(self, filename, add=False): + def _file_id(self, filename: str, add: bool=False) -> Optional[int]: """Get the file id for `filename`. If filename is not in the database yet, add it if `add` is True. @@ -400,19 +420,19 @@ class CoverageData(AutoReprMixin): ) return self._file_map.get(filename) - def _context_id(self, context): + def _context_id(self, context: str) -> Optional[int]: """Get the id for a context.""" assert context is not None self._start_using() with self._connect() as con: row = con.execute_one("select id from context where context = ?", (context,)) if row is not None: - return row[0] + return cast(int, row[0]) else: return None @_locked - def set_context(self, context): + def set_context(self, context: str) -> None: """Set the current context for future :meth:`add_lines` etc. `context` is a str, the name of the context to use for the next data @@ -426,7 +446,7 @@ class CoverageData(AutoReprMixin): self._current_context = context self._current_context_id = None - def _set_context_id(self): + def _set_context_id(self) -> None: """Use the _current_context to set _current_context_id.""" context = self._current_context or "" context_id = self._context_id(context) @@ -439,7 +459,7 @@ class CoverageData(AutoReprMixin): (context,) ) - def base_filename(self): + def base_filename(self) -> str: """The base filename for storing data. .. versionadded:: 5.0 @@ -447,7 +467,7 @@ class CoverageData(AutoReprMixin): """ return self._basename - def data_filename(self): + def data_filename(self) -> str: """Where is the data stored? .. versionadded:: 5.0 @@ -456,7 +476,7 @@ class CoverageData(AutoReprMixin): return self._filename @_locked - def add_lines(self, line_data): + def add_lines(self, line_data: Dict[str, Sequence[TLineNo]]) -> None: """Add measured line data. `line_data` is a dictionary mapping file names to iterables of ints:: @@ -466,7 +486,7 @@ class CoverageData(AutoReprMixin): """ if self._debug.should("dataop"): self._debug.write("Adding lines: %d files, %d lines total" % ( - len(line_data), sum(len(lines) for lines in line_data.values()) + len(line_data), sum(bool(len(lines)) for lines in line_data.values()) )) self._start_using() self._choose_lines_or_arcs(lines=True) @@ -490,7 +510,7 @@ class CoverageData(AutoReprMixin): ) @_locked - def add_arcs(self, arc_data): + def add_arcs(self, arc_data: Dict[str, Set[TArc]]) -> None: """Add measured arc data. `arc_data` is a dictionary mapping file names to iterables of pairs of @@ -518,7 +538,7 @@ class CoverageData(AutoReprMixin): data, ) - def _choose_lines_or_arcs(self, lines=False, arcs=False): + def _choose_lines_or_arcs(self, lines: bool=False, arcs: bool=False) -> None: """Force the data file to choose between lines and arcs.""" assert lines or arcs assert not (lines and arcs) @@ -540,7 +560,7 @@ class CoverageData(AutoReprMixin): ) @_locked - def add_file_tracers(self, file_tracers): + def add_file_tracers(self, file_tracers: Dict[str, str]) -> None: """Add per-file plugin information. `file_tracers` is { filename: plugin_name, ... } @@ -573,7 +593,7 @@ class CoverageData(AutoReprMixin): (file_id, plugin_name) ) - def touch_file(self, filename, plugin_name=""): + def touch_file(self, filename: str, plugin_name: str="") -> None: """Ensure that `filename` appears in the data, empty if needed. `plugin_name` is the name of the plugin responsible for this file. It is used @@ -581,7 +601,7 @@ class CoverageData(AutoReprMixin): """ self.touch_files([filename], plugin_name) - def touch_files(self, filenames, plugin_name=""): + def touch_files(self, filenames: Iterable[str], plugin_name: str="") -> None: """Ensure that `filenames` appear in the data, empty if needed. `plugin_name` is the name of the plugin responsible for these files. It is used @@ -600,7 +620,7 @@ class CoverageData(AutoReprMixin): # Set the tracer for this file self.add_file_tracers({filename: plugin_name}) - def update(self, other_data, aliases=None): + def update(self, other_data: CoverageData, aliases: Optional[PathAliases]=None) -> None: """Update this data with data from several other :class:`CoverageData` instances. If `aliases` is provided, it's a `PathAliases` object that is used to @@ -652,7 +672,7 @@ class CoverageData(AutoReprMixin): "inner join file on file.id = line_bits.file_id " + "inner join context on context.id = line_bits.context_id" ) as cur: - lines = {} + lines: Dict[Tuple[str, str], bytes] = {} for path, context, numbits in cur: key = (files[path], context) if key in lines: @@ -668,6 +688,7 @@ class CoverageData(AutoReprMixin): tracers = {files[path]: tracer for (path, tracer) in cur} with self._connect() as con: + assert con.con is not None con.con.isolation_level = "IMMEDIATE" # Get all tracers in the DB. Files not in the tracers are assumed @@ -768,7 +789,7 @@ class CoverageData(AutoReprMixin): self._reset() self.read() - def erase(self, parallel=False): + def erase(self, parallel: bool=False) -> None: """Erase the data in this object. If `parallel` is true, then also deletes data files created from the @@ -790,17 +811,17 @@ class CoverageData(AutoReprMixin): self._debug.write(f"Erasing parallel data file {filename!r}") file_be_gone(filename) - def read(self): + def read(self) -> None: """Start using an existing data file.""" if os.path.exists(self._filename): with self._connect(): self._have_used = True - def write(self): + def write(self) -> None: """Ensure the data is written to the data file.""" pass - def _start_using(self): + def _start_using(self) -> None: """Call this before using the database at all.""" if self._pid != os.getpid(): # Looks like we forked! Have to start a new data file. @@ -811,15 +832,15 @@ class CoverageData(AutoReprMixin): self.erase() self._have_used = True - def has_arcs(self): + def has_arcs(self) -> bool: """Does the database have arcs (True) or lines (False).""" return bool(self._has_arcs) - def measured_files(self): + def measured_files(self) -> Set[str]: """A set of all files that had been measured.""" return set(self._file_map) - def measured_contexts(self): + def measured_contexts(self) -> Set[str]: """A set of all contexts that have been measured. .. versionadded:: 5.0 @@ -831,7 +852,7 @@ class CoverageData(AutoReprMixin): contexts = {row[0] for row in cur} return contexts - def file_tracer(self, filename): + def file_tracer(self, filename: str) -> Optional[str]: """Get the plugin name of the file tracer for a file. Returns the name of the plugin that handles this file. If the file was @@ -849,7 +870,7 @@ class CoverageData(AutoReprMixin): return row[0] or "" return "" # File was measured, but no tracer associated. - def set_query_context(self, context): + def set_query_context(self, context: str) -> None: """Set a context for subsequent querying. The next :meth:`lines`, :meth:`arcs`, or :meth:`contexts_by_lineno` @@ -865,7 +886,7 @@ class CoverageData(AutoReprMixin): with con.execute("select id from context where context = ?", (context,)) as cur: self._query_context_ids = [row[0] for row in cur.fetchall()] - def set_query_contexts(self, contexts): + def set_query_contexts(self, contexts: Sequence[str]) -> None: """Set a number of contexts for subsequent querying. The next :meth:`lines`, :meth:`arcs`, or :meth:`contexts_by_lineno` @@ -886,7 +907,7 @@ class CoverageData(AutoReprMixin): else: self._query_context_ids = None - def lines(self, filename): + def lines(self, filename: str) -> Optional[List[TLineNo]]: """Get the list of lines executed for a source file. If the file was not measured, returns None. A file might be measured, @@ -921,7 +942,7 @@ class CoverageData(AutoReprMixin): nums.update(numbits_to_nums(row[0])) return list(nums) - def arcs(self, filename): + def arcs(self, filename: str) -> Optional[List[TArc]]: """Get the list of arcs executed for a file. If the file was not measured, returns None. A file might be measured, @@ -953,7 +974,7 @@ class CoverageData(AutoReprMixin): with con.execute(query, data) as cur: return list(cur) - def contexts_by_lineno(self, filename): + def contexts_by_lineno(self, filename: str) -> Dict[TLineNo, List[str]]: """Get the contexts for each line in a file. Returns: @@ -1005,7 +1026,7 @@ class CoverageData(AutoReprMixin): return {lineno: list(contexts) for lineno, contexts in lineno_contexts_map.items()} @classmethod - def sys_info(cls): + def sys_info(cls) -> List[Tuple[str, Any]]: """Our information for `Coverage.sys_info`. Returns a list of (key, value) pairs. @@ -1025,7 +1046,7 @@ class CoverageData(AutoReprMixin): ] -def filename_suffix(suffix): +def filename_suffix(suffix: Union[str, bool, None]) -> Union[str, None]: """Compute a filename suffix for a data file. If `suffix` is a string or None, simply return it. If `suffix` is True, @@ -1042,6 +1063,8 @@ def filename_suffix(suffix): # if the process forks. dice = random.Random(os.urandom(8)).randint(0, 999999) suffix = "%s.%s.%06d" % (socket.gethostname(), os.getpid(), dice) + elif suffix is False: + suffix = None return suffix @@ -1055,13 +1078,13 @@ class SqliteDb(AutoReprMixin): db.execute("insert into schema (version) values (?)", (SCHEMA_VERSION,)) """ - def __init__(self, filename, debug): + def __init__(self, filename: str, debug: TDebugCtl) -> None: self.debug = debug self.filename = filename self.nest = 0 - self.con = None + self.con: Optional[sqlite3.Connection] = None - def _connect(self): + def _connect(self) -> None: """Connect to the db and do universal initialization.""" if self.con is not None: return @@ -1087,23 +1110,25 @@ class SqliteDb(AutoReprMixin): # This pragma makes writing faster. self.execute_void("pragma synchronous=off") - def close(self): + def close(self) -> None: """If needed, close the connection.""" if self.con is not None and self.filename != ":memory:": self.con.close() self.con = None - def __enter__(self): + def __enter__(self) -> SqliteDb: if self.nest == 0: self._connect() + assert self.con is not None self.con.__enter__() self.nest += 1 return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore[no-untyped-def] self.nest -= 1 if self.nest == 0: try: + assert self.con is not None self.con.__exit__(exc_type, exc_value, traceback) self.close() except Exception as exc: @@ -1111,19 +1136,20 @@ class SqliteDb(AutoReprMixin): self.debug.write(f"EXCEPTION from __exit__: {exc}") raise DataError(f"Couldn't end data file {self.filename!r}: {exc}") from exc - def _execute(self, sql, parameters): + def _execute(self, sql: str, parameters: Iterable[Any]) -> sqlite3.Cursor: """Same as :meth:`python:sqlite3.Connection.execute`.""" if self.debug.should("sql"): tail = f" with {parameters!r}" if parameters else "" self.debug.write(f"Executing {sql!r}{tail}") try: + assert self.con is not None try: - return self.con.execute(sql, parameters) + return self.con.execute(sql, parameters) # type: ignore[arg-type] except Exception: # In some cases, an error might happen that isn't really an # error. Try again immediately. # https://github.com/nedbat/coveragepy/issues/1010 - return self.con.execute(sql, parameters) + return self.con.execute(sql, parameters) # type: ignore[arg-type] except sqlite3.Error as exc: msg = str(exc) try: @@ -1143,7 +1169,11 @@ class SqliteDb(AutoReprMixin): raise DataError(f"Couldn't use data file {self.filename!r}: {msg}") from exc @contextlib.contextmanager - def execute(self, sql, parameters=()): + def execute( + self, + sql: str, + parameters: Iterable[Any]=(), + ) -> Generator[sqlite3.Cursor, None, None]: """Context managed :meth:`python:sqlite3.Connection.execute`. Use with a ``with`` statement to auto-close the returned cursor. @@ -1154,19 +1184,20 @@ class SqliteDb(AutoReprMixin): finally: cur.close() - def execute_void(self, sql, parameters=()): + def execute_void(self, sql: str, parameters: Iterable[Any]=()) -> None: """Same as :meth:`python:sqlite3.Connection.execute` when you don't need the cursor.""" self._execute(sql, parameters).close() - def execute_for_rowid(self, sql, parameters=()): + def execute_for_rowid(self, sql: str, parameters: Iterable[Any]=()) -> int: """Like execute, but returns the lastrowid.""" with self.execute(sql, parameters) as cur: - rowid = cur.lastrowid + assert cur.lastrowid is not None + rowid: int = cur.lastrowid if self.debug.should("sqldata"): self.debug.write(f"Row id result: {rowid!r}") return rowid - def execute_one(self, sql, parameters=()): + def execute_one(self, sql: str, parameters: Iterable[Any]=()) -> Optional[Tuple[Any, ...]]: """Execute a statement and return the one row that results. This is like execute(sql, parameters).fetchone(), except it is @@ -1180,11 +1211,11 @@ class SqliteDb(AutoReprMixin): if len(rows) == 0: return None elif len(rows) == 1: - return rows[0] + return cast(Tuple[Any, ...], rows[0]) else: raise AssertionError(f"SQL {sql!r} shouldn't return {len(rows)} rows") - def _executemany(self, sql, data): + def _executemany(self, sql: str, data: Iterable[Any]) -> sqlite3.Cursor: """Same as :meth:`python:sqlite3.Connection.executemany`.""" if self.debug.should("sql"): data = list(data) @@ -1193,6 +1224,7 @@ class SqliteDb(AutoReprMixin): if self.debug.should("sqldata"): for i, row in enumerate(data): self.debug.write(f"{i:4d}: {row!r}") + assert self.con is not None try: return self.con.executemany(sql, data) except Exception: # pragma: cant happen @@ -1202,7 +1234,7 @@ class SqliteDb(AutoReprMixin): return self.con.executemany(sql, data) @contextlib.contextmanager - def executemany(self, sql, data): + def executemany(self, sql: str, data: Iterable[Any]) -> Generator[sqlite3.Cursor, None, None]: """Context managed :meth:`python:sqlite3.Connection.executemany`. Use with a ``with`` statement to auto-close the returned cursor. @@ -1213,18 +1245,20 @@ class SqliteDb(AutoReprMixin): finally: cur.close() - def executemany_void(self, sql, data): + def executemany_void(self, sql: str, data: Iterable[Any]) -> None: """Same as :meth:`python:sqlite3.Connection.executemany` when you don't need the cursor.""" self._executemany(sql, data).close() - def executescript(self, script): + def executescript(self, script: str) -> None: """Same as :meth:`python:sqlite3.Connection.executescript`.""" if self.debug.should("sql"): self.debug.write("Executing script with {} chars: {}".format( len(script), clipped_repr(script, 100), )) + assert self.con is not None self.con.executescript(script).close() - def dump(self): + def dump(self) -> str: """Return a multi-line string, the SQL dump of the database.""" + assert self.con is not None return "\n".join(self.con.iterdump()) |