summaryrefslogtreecommitdiff
path: root/coverage/numbits.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2019-08-11 16:13:57 -0400
committerNed Batchelder <ned@nedbatchelder.com>2019-08-11 16:13:57 -0400
commitfedaa98daab82a2a8ce0a76fe5cb9dc1c1c92eac (patch)
tree3fc09a4dcd643832aefefaa4573119bfcb41358f /coverage/numbits.py
parentaf9830596d53b43efea837a67efa2c683f69793f (diff)
downloadpython-coveragepy-git-fedaa98daab82a2a8ce0a76fe5cb9dc1c1c92eac.tar.gz
Improved numbits operations
* Better names (merge -> union) * More ops (intersection) * Can be registered for SQLite use * Numbits can be empty Also, line_map is a dumb table name. line_bits is marginally better.
Diffstat (limited to 'coverage/numbits.py')
-rw-r--r--coverage/numbits.py144
1 files changed, 125 insertions, 19 deletions
diff --git a/coverage/numbits.py b/coverage/numbits.py
index abe40f41..4b340c8e 100644
--- a/coverage/numbits.py
+++ b/coverage/numbits.py
@@ -5,29 +5,62 @@
Functions to manipulate packed binary representations of number sets.
To save space, coverage stores sets of line numbers in SQLite using a packed
-binary representation called a numbits. A numbits is stored as a blob in the
-database. The exact meaning of the bytes in the blobs should be considered an
-implementation detail that might change in the future. Use these functions to
-work with those binary blobs of data.
+binary representation called a numbits. A numbits is a set of positive
+integers.
+
+A numbits is stored as a blob in the database. The exact meaning of the bytes
+in the blobs should be considered an implementation detail that might change in
+the future. Use these functions to work with those binary blobs of data.
"""
+from coverage import env
from coverage.backward import byte_to_int, bytes_to_ints, binary_bytes, zip_longest
-from coverage.misc import contract
+from coverage.misc import contract, new_contract
+
+if env.PY3:
+ def _to_blob(b):
+ """Convert a bytestring into a type SQLite will accept for a blob."""
+ return b
+ new_contract('blob', lambda v: isinstance(v, bytes))
+else:
+ def _to_blob(b):
+ """Convert a bytestring into a type SQLite will accept for a blob."""
+ return buffer(b) # pylint: disable=undefined-variable
-@contract(nums='Iterable', returns='bytes')
+ new_contract('blob', lambda v: isinstance(v, buffer)) # pylint: disable=undefined-variable
+
+@contract(nums='Iterable', returns='blob')
def nums_to_numbits(nums):
- """Convert `nums` (a non-empty iterable of ints) into a numbits."""
- nbytes = max(nums) // 8 + 1
+ """Convert `nums` into a numbits.
+
+ Arguments:
+ nums (a reusable iterable of integers): the line numbers to store.
+
+ Returns:
+ A binary blob.
+ """
+ try:
+ nbytes = max(nums) // 8 + 1
+ except ValueError:
+ # nums was empty.
+ return _to_blob(b'')
b = bytearray(nbytes)
for num in nums:
b[num//8] |= 1 << num % 8
- return bytes(b)
+ return _to_blob(bytes(b))
-@contract(numbits='bytes', returns='list[int]')
+@contract(numbits='blob', returns='list[int]')
def numbits_to_nums(numbits):
- """Convert a numbits into a list of numbers."""
+ """Convert a numbits into a list of numbers.
+
+ Arguments:
+ numbits (a binary blob): the packed number set.
+
+ Returns:
+ A list of integers.
+ """
nums = []
for byte_i, byte in enumerate(bytes_to_ints(numbits)):
for bit_i in range(8):
@@ -35,22 +68,95 @@ def numbits_to_nums(numbits):
nums.append(byte_i * 8 + bit_i)
return nums
-@contract(numbits1='bytes', numbits2='bytes', returns='bytes')
-def merge_numbits(numbits1, numbits2):
- """Merge two numbits"""
+@contract(numbits1='blob', numbits2='blob', returns='blob')
+def numbits_union(numbits1, numbits2):
+ """Compute the union of two numbits.
+
+ Arguments:
+ numbits1, numbits2: packed number sets.
+
+ Returns:
+ A new numbits, the union of the two number sets.
+ """
+ byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0)
+ return _to_blob(binary_bytes(b1 | b2 for b1, b2 in byte_pairs))
+
+@contract(numbits1='blob', numbits2='blob', returns='blob')
+def numbits_intersection(numbits1, numbits2):
+ """Compute the intersection of two numbits.
+
+ Arguments:
+ numbits1, numbits2: packed number sets.
+
+ Returns:
+ A new numbits, the intersection of the two number sets.
+ """
byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0)
- return binary_bytes(b1 | b2 for b1, b2 in byte_pairs)
+ intersection_bytes = binary_bytes(b1 & b2 for b1, b2 in byte_pairs)
+ return _to_blob(intersection_bytes.rstrip(b'\0'))
-@contract(numbits1='bytes', numbits2='bytes', returns='bool')
+@contract(numbits1='blob', numbits2='blob', returns='bool')
def numbits_any_intersection(numbits1, numbits2):
- """Is there any number that appears in both numbits?"""
+ """Is there any number that appears in both numbits?
+
+ Determine whether two number sets have a non-empty intersection. This is
+ faster than computing the intersection.
+
+ Arguments:
+ numbits1, numbits2: packed number sets.
+
+ Returns:
+ A boolean, true if there is any number in both of the number sets.
+ """
byte_pairs = zip_longest(bytes_to_ints(numbits1), bytes_to_ints(numbits2), fillvalue=0)
return any(b1 & b2 for b1, b2 in byte_pairs)
-@contract(num='int', numbits='bytes', returns='bool')
+@contract(num='int', numbits='blob', returns='bool')
def num_in_numbits(num, numbits):
- """Does the integer `num` appear in `numbits`?"""
+ """Does the integer `num` appear in `numbits`?
+
+ Arguments:
+ num (integer)
+
+ numbits (binary blob)
+
+ Returns:
+ A boolean, true if `num` is a member of `numbits`.
+ """
nbyte, nbit = divmod(num, 8)
if nbyte >= len(numbits):
return False
return bool(byte_to_int(numbits[nbyte]) & (1 << nbit))
+
+def register_sqlite_functions(connection):
+ """
+ Define numbits functions in a SQLite connection.
+
+ This defines these functions for use in SQLite statements:
+
+ * :func:`numbits_union`
+ * :func:`numbits_intersection`
+ * :func:`numbits_any_intersection`
+ * :func:`num_in_numbits`
+
+ `connection` is a :class:`sqlite3.Connection <python:sqlite3.Connection>`
+ object. After creating the connection, pass it to this function to
+ register the numbits functions. Then you can use numbits functions in your
+ queries::
+
+ import sqlite3
+ from coverage.numbits import register_sqlite_functions
+
+ conn = sqlite3.connect('example.db')
+ register_sqlite_functions(conn)
+ c = conn.cursor()
+ c.execute(
+ "select lb.file_id, lb.context_id from line_bits lb"
+ "where num_in_numbits(?, lb.numbits)",
+ (interesting_line_number,)
+ )
+ """
+ connection.create_function("numbits_union", 2, numbits_union)
+ connection.create_function("numbits_intersection", 2, numbits_intersection)
+ connection.create_function("numbits_any_intersection", 2, numbits_any_intersection)
+ connection.create_function("num_in_numbits", 2, num_in_numbits)