data.py 8.97 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
"""Coverage data for Coverage."""

import os

from coverage.backward import iitems, pickle, sorted    # pylint: disable=W0622
from coverage.files import PathAliases
from coverage.misc import file_be_gone


class CoverageData(object):
    """Manages collected coverage data, including file storage.

    The data file format is a pickled dict, with these keys:

        * collector: a string identifying the collecting software

        * lines: a dict mapping filenames to sorted lists of line numbers
          executed:
            { 'file1': [17,23,45],  'file2': [1,2,3], ... }

        * arcs: a dict mapping filenames to sorted lists of line number pairs:
            { 'file1': [(17,23), (17,25), (25,26)], ... }

    """

    def __init__(self, basename=None, collector=None, debug=None):
        """Create a CoverageData.

        `basename` is the name of the file to use for storing data.

        `collector` is a string describing the coverage measurement software.

        `debug` is a `DebugControl` object for writing debug messages.

        """
        self.collector = collector or 'unknown'
        self.debug = debug

        self.use_file = True

        # Construct the filename that will be used for data file storage, if we
        # ever do any file storage.
        self.filename = basename or ".coverage"
        self.filename = os.path.abspath(self.filename)

        # A map from canonical Python source file name to a dictionary in
        # which there's an entry for each line number that has been
        # executed:
        #
        #   {
        #       'filename1.py': { 12: None, 47: None, ... },
        #       ...
        #       }
        #
        self.lines = {}

        # A map from canonical Python source file name to a dictionary with an
        # entry for each pair of line numbers forming an arc:
        #
        #   {
        #       'filename1.py': { (12,14): None, (47,48): None, ... },
        #       ...
        #       }
        #
        self.arcs = {}

    def usefile(self, use_file=True):
        """Set whether or not to use a disk file for data."""
        self.use_file = use_file

    def read(self):
        """Read coverage data from the coverage data file (if it exists)."""
        if self.use_file:
            self.lines, self.arcs = self._read_file(self.filename)
        else:
            self.lines, self.arcs = {}, {}

    def write(self, suffix=None):
        """Write the collected coverage data to a file.

        `suffix` is a suffix to append to the base file name. This can be used
        for multiple or parallel execution, so that many coverage data files
        can exist simultaneously.  A dot will be used to join the base name and
        the suffix.

        """
        if self.use_file:
            filename = self.filename
            if suffix:
                filename += "." + suffix
            self.write_file(filename)

    def erase(self):
        """Erase the data, both in this object, and from its file storage."""
        if self.use_file:
            if self.filename:
                file_be_gone(self.filename)
        self.lines = {}
        self.arcs = {}

    def line_data(self):
        """Return the map from filenames to lists of line numbers executed."""
        return dict(
            [(f, sorted(lmap.keys())) for f, lmap in iitems(self.lines)]
            )

    def arc_data(self):
        """Return the map from filenames to lists of line number pairs."""
        return dict(
            [(f, sorted(amap.keys())) for f, amap in iitems(self.arcs)]
            )

    def write_file(self, filename):
        """Write the coverage data to `filename`."""

        # Create the file data.
        data = {}

        data['lines'] = self.line_data()
        arcs = self.arc_data()
        if arcs:
            data['arcs'] = arcs

        if self.collector:
            data['collector'] = self.collector

        if self.debug and self.debug.should('dataio'):
            self.debug.write("Writing data to %r" % (filename,))

        # Write the pickle to the file.
        fdata = open(filename, 'wb')
        try:
            pickle.dump(data, fdata, 2)
        finally:
            fdata.close()

    def read_file(self, filename):
        """Read the coverage data from `filename`."""
        self.lines, self.arcs = self._read_file(filename)

    def raw_data(self, filename):
        """Return the raw pickled data from `filename`."""
        if self.debug and self.debug.should('dataio'):
            self.debug.write("Reading data from %r" % (filename,))
        fdata = open(filename, 'rb')
        try:
            data = pickle.load(fdata)
        finally:
            fdata.close()
        return data

    def _read_file(self, filename):
        """Return the stored coverage data from the given file.

        Returns two values, suitable for assigning to `self.lines` and
        `self.arcs`.

        """
        lines = {}
        arcs = {}
        try:
            data = self.raw_data(filename)
            if isinstance(data, dict):
                # Unpack the 'lines' item.
                lines = dict([
                    (f, dict.fromkeys(linenos, None))
                        for f, linenos in iitems(data.get('lines', {}))
                    ])
                # Unpack the 'arcs' item.
                arcs = dict([
                    (f, dict.fromkeys(arcpairs, None))
                        for f, arcpairs in iitems(data.get('arcs', {}))
                    ])
        except Exception:
            pass
        return lines, arcs

    def combine_parallel_data(self, aliases=None):
        """Combine a number of data files together.

        Treat `self.filename` as a file prefix, and combine the data from all
        of the data files starting with that prefix plus a dot.

        If `aliases` is provided, it's a `PathAliases` object that is used to
        re-map paths to match the local machine's.

        """
        aliases = aliases or PathAliases()
        data_dir, local = os.path.split(self.filename)
        localdot = local + '.'
        for f in os.listdir(data_dir or '.'):
            if f.startswith(localdot):
                full_path = os.path.join(data_dir, f)
                new_lines, new_arcs = self._read_file(full_path)
                for filename, file_data in iitems(new_lines):
                    filename = aliases.map(filename)
                    self.lines.setdefault(filename, {}).update(file_data)
                for filename, file_data in iitems(new_arcs):
                    filename = aliases.map(filename)
                    self.arcs.setdefault(filename, {}).update(file_data)
                if f != local:
                    os.remove(full_path)

    def add_line_data(self, line_data):
        """Add executed line data.

        `line_data` is { filename: { lineno: None, ... }, ...}

        """
        for filename, linenos in iitems(line_data):
            self.lines.setdefault(filename, {}).update(linenos)

    def add_arc_data(self, arc_data):
        """Add measured arc data.

        `arc_data` is { filename: { (l1,l2): None, ... }, ...}

        """
        for filename, arcs in iitems(arc_data):
            self.arcs.setdefault(filename, {}).update(arcs)

    def touch_file(self, filename):
        """Ensure that `filename` appears in the data, empty if needed."""
        self.lines.setdefault(filename, {})

    def measured_files(self):
        """A list of all files that had been measured."""
        return list(self.lines.keys())

    def executed_lines(self, filename):
        """A map containing all the line numbers executed in `filename`.

        If `filename` hasn't been collected at all (because it wasn't executed)
        then return an empty map.

        """
        return self.lines.get(filename) or {}

    def executed_arcs(self, filename):
        """A map containing all the arcs executed in `filename`."""
        return self.arcs.get(filename) or {}

    def add_to_hash(self, filename, hasher):
        """Contribute `filename`'s data to the Md5Hash `hasher`."""
        hasher.update(self.executed_lines(filename))
        hasher.update(self.executed_arcs(filename))

    def summary(self, fullpath=False):
        """Return a dict summarizing the coverage data.

        Keys are based on the filenames, and values are the number of executed
        lines.  If `fullpath` is true, then the keys are the full pathnames of
        the files, otherwise they are the basenames of the files.

        """
        summ = {}
        if fullpath:
            filename_fn = lambda f: f
        else:
            filename_fn = os.path.basename
        for filename, lines in iitems(self.lines):
            summ[filename_fn(filename)] = len(lines)
        return summ

    def has_arcs(self):
        """Does this data have arcs?"""
        return bool(self.arcs)


if __name__ == '__main__':
    # Ad-hoc: show the raw data in a data file.
    import pprint, sys
    covdata = CoverageData()
    if sys.argv[1:]:
        fname = sys.argv[1]
    else:
        fname = covdata.filename
    pprint.pprint(covdata.raw_data(fname))