summaryrefslogtreecommitdiff
path: root/coverage/data.py
blob: 16bcf8f9b1b62b8507b8d55e2879fecc24678156 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Coverage data for coverage.py"""

import os, marshal, socket, types

class CoverageData:
    """Manages collected coverage data."""
    # Name of the data file (unless environment variable is set).
    filename_default = ".coverage"

    # Environment variable naming the data file.
    filename_env = "COVERAGE_FILE"

    def __init__(self):
        self.filename = None
        self.use_file = True

        # A map from canonical Python source file name to a dictionary in
        # which there's an entry for each line number that has been
        # executed:
        #
        #   {
        #       'filename1.py': { 12: True, 47: True, ... },
        #       ...
        #       }
        #
        self.executed = {}
        
    def usefile(self, use_file=True, filename_default=None):
        self.use_file = use_file
        if filename_default and not self.filename:
            self.filename_default = filename_default

    def read(self, parallel=False):
        """Read coverage data from the coverage data file (if it exists)."""
        data = {}
        if self.use_file and not self.filename:
            self.filename = os.environ.get(
                                    self.filename_env, self.filename_default)
            if parallel:
                self.filename += "." + socket.gethostname()
                self.filename += "." + str(os.getpid())
            if os.path.exists(self.filename):
                data = self._read_file(self.filename)
        self.executed = data

    def write(self):
        """Write the collected coverage data to a file."""
        if self.use_file and self.filename:
            self.write_file(self.filename)
            
    def erase(self):
        if self.filename and os.path.exists(self.filename):
            os.remove(self.filename)

    def write_file(self, filename):
        """Write the coverage data to `filename`."""
        f = open(filename, 'wb')
        try:
            marshal.dump(self.executed, f)
        finally:
            f.close()

    def read_file(self, filename):
        self.executed = self._read_file(filename)
        
    def _read_file(self, filename):
        """ Return the stored coverage data from the given file.
        """
        try:
            fdata = open(filename, 'rb')
            executed = marshal.load(fdata)
            fdata.close()
            if isinstance(executed, types.DictType):
                return executed
            else:
                return {}
        except:
            return {}

    def combine_parallel_data(self):
        """ Treat self.filename as a file prefix, and combine the data from all
            of the files starting with that prefix.
        """
        data_dir, local = os.path.split(self.filename)
        for f in os.listdir(data_dir or '.'):
            if f.startswith(local):
                full_path = os.path.join(data_dir, f)
                file_data = self._read_file(full_path)
                self._combine_data(file_data)

    def _combine_data(self, new_data):
        """Combine the `new_data` into `executed`."""
        for filename, file_data in new_data.items():
            self.executed.setdefault(filename, {}).update(file_data)

    def add_raw_data(self, data_points):
        """Add raw data.
        
        `data_points` is (filename, lineno) pairs.
        
        """
        for filename, lineno in data_points:
            self.executed.setdefault(filename, {})[lineno] = True

    def executed_files(self):
        """A list of all files that had been measured as executed."""
        return self.executed.keys()

    def executed_lines(self, filename):
        """A map containing all the line numbers executed in `filename`."""
        return self.executed[filename]

    def summary(self):
        """Return a dict summarizing the coverage data.
        
        Keys are the basename of the filenames, and values are the number of
        executed lines.  This is useful in the unit tests.
        
        """
        summ = {}
        for filename, lines in self.executed.items():
            summ[os.path.basename(filename)] = len(lines)
        return summ