dpranke@chromium.org | 2a00962 | 2011-03-01 02:43:31 +0000 | [diff] [blame] | 1 | # Copyright (c) 2010 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """A database of OWNERS files.""" |
| 6 | |
| 7 | class Assertion(AssertionError): |
| 8 | pass |
| 9 | |
dpranke@chromium.org | 2a00962 | 2011-03-01 02:43:31 +0000 | [diff] [blame] | 10 | |
dpranke@chromium.org | 898a10e | 2011-03-04 21:54:43 +0000 | [diff] [blame] | 11 | class SyntaxErrorInOwnersFile(Exception): |
| 12 | def __init__(self, path, line, msg): |
| 13 | super(SyntaxErrorInOwnersFile, self).__init__((path, line, msg)) |
| 14 | self.path = path |
| 15 | self.line = line |
| 16 | self.msg = msg |
| 17 | |
| 18 | def __str__(self): |
| 19 | if self.msg: |
| 20 | return "%s:%d syntax error: %s" % (self.path, self.line, self.msg) |
| 21 | else: |
| 22 | return "%s:%d syntax error" % (self.path, self.line) |
| 23 | |
| 24 | |
| 25 | # Wildcard email-address in the OWNERS file. |
| 26 | ANYONE = '*' |
| 27 | |
| 28 | |
| 29 | class Database(object): |
| 30 | """A database of OWNERS files for a repository. |
| 31 | |
| 32 | This class allows you to find a suggested set of reviewers for a list |
| 33 | of changed files, and see if a list of changed files is covered by a |
| 34 | list of reviewers.""" |
| 35 | |
| 36 | def __init__(self, root, fopen, os_path): |
| 37 | """Args: |
dpranke@chromium.org | 2a00962 | 2011-03-01 02:43:31 +0000 | [diff] [blame] | 38 | root: the path to the root of the Repository |
| 39 | all_owners: the list of every owner in the system |
| 40 | open: function callback to open a text file for reading |
| 41 | os_path: module/object callback with fields for 'exists', |
dpranke@chromium.org | 898a10e | 2011-03-04 21:54:43 +0000 | [diff] [blame] | 42 | 'dirname', and 'join' |
dpranke@chromium.org | 2a00962 | 2011-03-01 02:43:31 +0000 | [diff] [blame] | 43 | """ |
| 44 | self.root = root |
| 45 | self.fopen = fopen |
| 46 | self.os_path = os_path |
| 47 | |
| 48 | # Mapping of files to authorized owners. |
| 49 | self.files_owned_by = {} |
| 50 | |
| 51 | # Mapping of owners to the files they own. |
| 52 | self.owners_for = {} |
| 53 | |
| 54 | # In-memory cached map of files to their OWNERS files. |
| 55 | self.owners_file_for = {} |
| 56 | |
| 57 | # In-memory cache of OWNERS files and their contents |
| 58 | self.owners_files = {} |
| 59 | |
dpranke@chromium.org | 898a10e | 2011-03-04 21:54:43 +0000 | [diff] [blame] | 60 | def ReviewersFor(self, files): |
| 61 | """Returns a suggested set of reviewers that will cover the set of files. |
dpranke@chromium.org | 2a00962 | 2011-03-01 02:43:31 +0000 | [diff] [blame] | 62 | |
| 63 | The set of files are paths relative to (and under) self.root.""" |
| 64 | self._LoadDataNeededFor(files) |
| 65 | return self._CoveringSetOfOwnersFor(files) |
| 66 | |
| 67 | def FilesAreCoveredBy(self, files, reviewers): |
| 68 | return not self.FilesNotCoveredBy(files, reviewers) |
| 69 | |
| 70 | def FilesNotCoveredBy(self, files, reviewers): |
| 71 | covered_files = set() |
| 72 | for reviewer in reviewers: |
| 73 | covered_files = covered_files.union(self.files_owned_by[reviewer]) |
| 74 | return files.difference(covered_files) |
| 75 | |
| 76 | def _LoadDataNeededFor(self, files): |
| 77 | for f in files: |
| 78 | self._LoadOwnersFor(f) |
| 79 | |
| 80 | def _LoadOwnersFor(self, f): |
| 81 | if f not in self.owners_for: |
| 82 | owner_file = self._FindOwnersFileFor(f) |
| 83 | self.owners_file_for[f] = owner_file |
| 84 | self._ReadOwnersFile(owner_file, f) |
| 85 | |
| 86 | def _FindOwnersFileFor(self, f): |
| 87 | # This is really a "do ... until dirname = ''" |
| 88 | dirname = self.os_path.dirname(f) |
| 89 | while dirname: |
| 90 | owner_path = self.os_path.join(dirname, 'OWNERS') |
| 91 | if self.os_path.exists(owner_path): |
| 92 | return owner_path |
| 93 | dirname = self.os_path.dirname(dirname) |
| 94 | owner_path = self.os_path.join(dirname, 'OWNERS') |
| 95 | if self.os_path.exists(owner_path): |
| 96 | return owner_path |
| 97 | raise Assertion('No OWNERS file found for %s' % f) |
| 98 | |
| 99 | def _ReadOwnersFile(self, owner_file, affected_file): |
| 100 | owners_for = self.owners_for.setdefault(affected_file, set()) |
| 101 | for owner in self.fopen(owner_file): |
| 102 | owner = owner.strip() |
| 103 | self.files_owned_by.setdefault(owner, set()).add(affected_file) |
| 104 | owners_for.add(owner) |
| 105 | |
| 106 | def _CoveringSetOfOwnersFor(self, files): |
| 107 | # TODO(dpranke): implement the greedy algorithm for covering sets, and |
| 108 | # consider returning multiple options in case there are several equally |
| 109 | # short combinations of owners. |
| 110 | every_owner = set() |
| 111 | for f in files: |
| 112 | every_owner = every_owner.union(self.owners_for[f]) |
| 113 | return every_owner |