blob: 26db28a29f623f9fc8e8dcf0d065bd1293e38140 [file] [log] [blame]
Dennis Kempin58d9a742014-05-08 18:34:59 -07001# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5""" This module provides various tools for writing cros developer tools.
6
7This includes tools for regular expressions, path processing, chromium os git
8repositories, shell commands, and user interaction via command line.
9"""
10import os
11import re
12import shlex
13import shutil
14import subprocess
15import sys
16import time
17
18class Emerge(object):
19 """Provides tools for emerging and deploying packages to chromebooks."""
20
21 def __init__(self, board_variant):
22 self.cmd = "emerge-{}".format(board_variant)
23
24 def Emerge(self, package_name):
25 SafeExecute([self.cmd, package_name], verbose=True)
26
27 def Deploy(self, package_name, remote, emerge=True):
28 if emerge:
29 self.Emerge(package_name)
30 SafeExecute(["cros", "deploy", remote.ip, package_name], verbose=True)
31
32
33class RegexException(Exception):
34 """Describes a regular expression failure."""
35 def __init__(self, regex, string, match_type):
36 Exception.__init__(self)
37 self.regex = regex
38 self.string = string
39 self.match_type = match_type
40
41 def __str__(self):
42 capped = self.string
43 if len(capped) > 256:
44 capped = capped[1:251] + "[...]"
45 msg = "RequiredRegex \"{}\" failed to {} on:\n{}\n"
46 return msg.format(self.regex, self.match_type, capped)
47
48
49class RequiredRegex(object):
50 """Wrapper for regular expressions using exceptions.
51
52 Most regular expression calls in mttools are used for parsing and any
53 mismatches result in a program failure.
54 To reduce the amount of error checking done in place this wrapper throws
55 meaningful exceptions in case of mismatches.
56 """
57 cap_length = 30
58
59 def __init__(self, pattern):
60 self.pattern = pattern
61
62 def Search(self, string, must_succeed=True):
63 match = re.search(self.pattern, string)
64 return self.__Check(match, string, must_succeed, "search")
65
66 def Match(self, string, must_succeed=True):
67 match = re.match(self.pattern, string)
68 return self.__Check(match, string, must_succeed, "match")
69
70 def __Check(self, match, string, must_succeed, match_type):
71 if not match and must_succeed:
72 raise RegexException(self.pattern, string, match_type)
73 return match
74
75
76class Path(object):
77 """Wrapper for os.path functions enforcing absolute/real paths.
78
79 This wrapper helps processing file paths by enforcing paths to be
80 always absolute and with symlinks resolved. Being a class it also
81 allows a more compact syntax for common operations.
82 """
83 def __init__(self, path, *args):
84 if isinstance(path, Path):
85 self.path = path.path
86 else:
87 self.path = os.path.abspath(path)
88
89 for arg in args:
90 self.path = os.path.join(self.path, str(arg))
91
92 def ListDir(self):
93 for entry in os.listdir(self.path):
94 yield Path(self, entry)
95
96 def Read(self):
97 return open(self.path, "r").read()
98
99 def Open(self, props):
100 return open(self.path, props)
101
102 def Write(self, value):
103 self.Open("w").write(value)
104
105 def Join(self, other):
106 return Path(os.path.join(self.path, other))
107
108 def CopyTo(self, target):
109 shutil.copy(self.path, str(target))
110
111 def MoveTo(self, target):
112 target = Path(target)
113 shutil.move(self.path, str(target))
114
115 def RelPath(self, rel=os.curdir):
116 return os.path.relpath(self.path, str(rel))
117
118 def RmTree(self):
119 return shutil.rmtree(self.path)
120
121 def MakeDirs(self):
122 if not self.exists:
123 os.makedirs(self.path)
124
125 @property
126 def parent(self):
127 return Path(os.path.dirname(self.path))
128
129 @property
130 def exists(self):
131 return os.path.exists(self.path)
132
133 @property
134 def is_file(self):
135 return os.path.isfile(self.path)
136
137 @property
138 def is_link(self):
139 return os.path.islink(self.path)
140
141 @property
142 def is_dir(self):
143 return os.path.isdir(self.path)
144
145 @property
146 def basename(self):
147 return os.path.basename(self.path)
148
149 def __eq__(self, other):
150 return self.path == other.path
151
152 def __ne__(self, other):
153 return self.path != other.path
154
155 def __div__(self, other):
156 return self.Join(other)
157
158 def __bool__(self):
159 return self.exists
160
161 def __str__(self):
162 return self.path
163
164
165class ExecuteException(Exception):
166 """Describes a failure to run a shell command."""
167 def __init__(self, command, code, out, verbose=False):
168 Exception.__init__(self)
169 self.command = command
170 self.code = code
171 self.out = out
172 self.verbose = verbose
173
174 def __str__(self):
175 string = "$ %s\n" % " ".join(self.command)
176 if self.out:
177 string = string + str(self.out) + "\n"
178 string = string + "Command returned " + str(self.code)
179 return string
180
181
182def Execute(command, cwd=None, verbose=False, interactive=False,
183 must_succeed=False):
184 """Execute shell command.
185
186 Returns false if the command failed (i.e. return code != 0), otherwise
187 this command returns the stdout/stderr output of the command.
188 The verbose flag will print the executed command.
189 The interactive flag will not capture stdout/stderr, but allow direct
190 interaction with the command.
191 """
192 if must_succeed:
193 return SafeExecute(command, cwd, verbose, interactive)
194 (code, out) = __Execute(command, cwd, verbose, interactive)
195 if code != 0:
196 return False
197 return out
198
199
200def SafeExecute(command, cwd=None, verbose=False, interactive=False):
201 """Execute shell command and throw exception upon failure.
202
203 This method behaves the same as Execute, but throws an ExecuteException
204 if the command fails.
205 """
206 if isinstance(command, basestring):
207 command = shlex.split(command)
208 (code, out) = __Execute(command, cwd, verbose, interactive)
209 if code != 0:
210 raise ExecuteException(command, code, out, verbose)
211 return out
212
213def __Execute(command, cwd=None, verbose=False, interactive=False):
214 if isinstance(command, basestring):
215 command = shlex.split(command)
216
217 if cwd:
218 cwd = str(cwd)
219
220 if verbose:
221 print "$", " ".join(command)
222
223 if interactive:
224 process = subprocess.Popen(command, cwd=cwd)
225 else:
226 process = subprocess.Popen(command,
227 stdout=subprocess.PIPE,
228 stderr=subprocess.STDOUT,
229 cwd=cwd)
230 if verbose:
231 # print a . approximately every second to show progress
232 seconds = 0
233 while process.poll() is None:
234 seconds += 0.01
235 time.sleep(0.01)
236 if seconds > 1:
237 seconds = seconds - 1
238 sys.stdout.write(".")
239 sys.stdout.flush()
240 print ""
241 else:
242 process.wait()
243
244 out = None
245 if not interactive:
246 out = process.stdout.read().replace("\r", "").strip()
247
248 return (process.returncode, out)
249
250class GitRepo(object):
251 """A helper class to work with Git repositories.
252
253 This class is specialized to deal with chromium specific git workflows
254 and uses the git command line program to interface with the repository.
255 """
256 def __init__(self, repo_path, verbose=False):
257 self.path = Path(repo_path)
258 self.verbose = verbose
259 rel_path = self.path.RelPath("/mnt/host/source/src/")
260 self.review_url = ("https://chrome-internal.googlesource.com/chromeos/" +
261 rel_path)
262
263 def SafeExecute(self, command):
264 if isinstance(command, str):
265 command = "git " + command
266 else:
267 command = ["git"] + command
268 return SafeExecute(command, cwd=self.path, verbose=self.verbose)
269
270 def Execute(self, command):
271 if isinstance(command, str):
272 command = "git " + command
273 else:
274 command = ["git"] + command
275 return Execute("git " + command, cwd=self.path, verbose=self.verbose)
276
277 @property
278 def active_branch(self):
279 result = self.SafeExecute("branch")
280
281 regex = "\\* (\\S+)"
282 match = re.search(regex, result)
283 if match:
284 return match.group(1)
285 return None
286
287 @property
288 def branches(self):
289 branches = []
290 result = self.SafeExecute("branch")
291 regex = "(\\S+)$"
292 for match in re.finditer(regex, result):
293 branches.append(match.group(1))
294 print "Branches:", branches
295 return branches
296
297 @property
298 def working_directory_dirty(self):
299 result = self.SafeExecute("status")
300 return "Changes not staged for commit" in result
301
302 @property
303 def index_dirty(self):
304 result = self.SafeExecute("status")
305 return "Changes to be committed" in result
306
307 @property
308 def diverged(self):
309 result = self.SafeExecute("status")
310 return "Your branch is ahead of" in result
311
312 @property
313 def remote(self):
314 return self.SafeExecute("remote").strip()
315
316 @property
317 def status(self):
318 status = []
319 if self.index_dirty:
320 status.append("changes in index")
321 if self.diverged:
322 status.append("diverged from master")
323 if self.working_directory_dirty:
324 status.append("local changes")
325 if not status:
326 status.append("clean")
327 return ", ".join(status)
328
329 def Move(self, source, destination):
330 source = Path(source).RelPath(self.path)
331 destination = Path(destination).RelPath(self.path)
332 self.SafeExecute(["mv", source, destination])
333
334 def DeleteBranch(self, branch_name):
335 if branch_name not in self.branches:
336 return
337 self.SafeExecute("checkout -f m/master")
338 self.SafeExecute("branch -D " + branch_name)
339
340 def CreateBranch(self, branch_name, tracking=None):
341 if tracking:
342 self.SafeExecute("checkout -b " + branch_name + " " + tracking)
343 else:
344 self.SafeExecute("checkout -b " + branch_name)
345
346 def Checkout(self, name, force=False):
347 cmd = "checkout " + name
348 if force:
349 cmd = cmd + " -f"
350 self.SafeExecute(cmd)
351
352 def Stash(self):
353 self.SafeExecute("stash")
354
355 def Add(self, filepath):
356 self.SafeExecute(["add", Path(filepath).RelPath(self.path)])
357
358 def Commit(self, message, ammend=False, all_changes=False,
359 ammend_if_diverged=False):
360 cmd = "commit"
361 if ammend or (ammend_if_diverged and self.diverged):
362 existing = self.SafeExecute("log --format=%B -n 1")
363 regex = "Change-Id: I[a-f0-9]+"
364 match = re.search(regex, existing)
365 if match:
366 message = message + "\n" + match.group(0)
367 cmd = cmd + " --amend"
368 if all_changes:
369 cmd = cmd + " -a"
370 cmd = cmd + " -m \"" + message + "\""
371 self.SafeExecute(cmd)
372
373 def Upload(self):
374 result = self.SafeExecute("push %s HEAD:refs/for/master" % self.remote)
375 regex = "https://[a-zA-Z0-9\\-\\./]+"
376 match = re.search(regex, result)
377 if match:
378 return match.group(0)
379 return None
380
381class AskUser(object):
382 """Various static methods to ask for user input."""
383
384 @staticmethod
385 def Text(message=None, validate=None, default=None):
386 """Ask user to input a text."""
387 while True:
388 if message:
389 print message
390 reply = sys.stdin.readline().strip()
391 if len(reply) == 0:
392 return default
393 if validate:
394 try:
395 reply = validate(reply)
396 except Exception, e:
397 print e
398 continue
399 return reply
400
401 @staticmethod
402 def Continue():
403 """Ask user to press enter to continue."""
404 AskUser.Text("Press enter to continue.")
405
406 @staticmethod
407 def YesNo(message, default=False):
408 """Ask user to reply with yes or no."""
409 if default is True:
410 print message, "[Y/n]"
411 else:
412 print message, "[y/N]"
413 choice = sys.stdin.readline().strip()
414 if choice in ("y", "Y", "Yes", "yes"):
415 return True
416 elif choice in ("n", "N", "No", "no"):
417 return False
418 return default
419
420 @staticmethod
421 def Select(choices, msg, allow_none=False):
422 """Ask user to make a single choice from a list.
423
424 Returns the index of the item selected by the user.
425
426 allow_none allows the user to make an empty selection, which
427 will return None.
428
429 Note: Both None and the index 0 will evaluate to boolean False,
430 check the value explicitly with "if selection is None".
431 """
432 selection = AskUser.__Select(choices, msg, False, allow_none)
433 if len(selection) == 0:
434 return None
435 return selection[0]
436
437 @staticmethod
438 def SelectMulti(choices, msg, allow_none=False):
439 """Ask user to make a multiple choice from a list.
440
441 Returns the list of indices selected by the user.
442
443 allow_none allows the user to make an empty selection, which
444 will return an empty list.
445 """
446 return AskUser.__Select(choices, msg, True, allow_none)
447
448 @staticmethod
449 def __Select(choices, msg, allow_multi=False, allow_none=False):
450 # skip selection if there is only one option
451 if len(choices) == 1 and not allow_none:
452 return [0]
453
454 # repeat until user made a valid selection
455 while True:
456 # get user input (displays index + 1).
457 print msg
458 for i, item in enumerate(choices):
459 print " ", str(i + 1) + ":", item
460 if allow_multi:
461 print "(Separate multiple selections with spaces)"
462 if allow_none:
463 print "(Press enter to select none)"
464
465 sys.stdout.write('> ')
466 sys.stdout.flush()
467 selection = sys.stdin.readline()
468
469 if len(selection.strip()) == 0 and allow_none:
470 return []
471
472 if allow_multi:
473 selections = selection.split(" ")
474 else:
475 selections = [selection]
476
477 # validates single input value
478 def ProcessSelection(selection):
479 try:
480 idx = int(selection) - 1
481 if idx < 0 or idx >= len(choices):
482 print 'Number out of range'
483 return None
484 except ValueError:
485 print 'Not a number'
486 return None
487 return idx
488
489 # validate list of values
490 selections = [ProcessSelection(s) for s in selections]
491 if None not in selections:
492 return selections
493