blob: d81a9745d3ebe9c76569736f025d326aa50a4d3e [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mblighe1836812009-04-17 18:25:14 +00003import sys, os, subprocess, traceback, time, signal, pickle
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
mblighe1836812009-04-17 18:25:14 +00008def parallel(tasklist, timeout=None, return_results=False):
9 """Run a set of predefined subcommands in parallel.
10 If return_results is True instead of an exception being raised on
11 any error a list of the results/exceptions from the functions is
12 returned.
13 """
jadmanski0afbb632008-06-06 21:10:57 +000014 pids = []
15 run_error = False
16 for task in tasklist:
17 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000018
jadmanski0afbb632008-06-06 21:10:57 +000019 remaining_timeout = None
20 if timeout:
21 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000022
mblighe1836812009-04-17 18:25:14 +000023 results = []
jadmanski0afbb632008-06-06 21:10:57 +000024 for task in tasklist:
25 if timeout:
26 remaining_timeout = max(endtime - time.time(), 1)
27 try:
28 status = task.fork_waitfor(remaining_timeout)
29 except error.AutoservSubcommandError:
30 run_error = True
31 else:
32 if status != 0:
33 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000034
mblighe1836812009-04-17 18:25:14 +000035 results.append(pickle.load(task.result_pickle))
36 task.result_pickle.close()
37
38 if return_results:
39 return results
40 elif run_error:
jadmanski0afbb632008-06-06 21:10:57 +000041 raise error.AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000042
43
mblighc3aee0f2008-01-17 16:26:39 +000044def parallel_simple(function, arglist, log=True, timeout=None):
jadmanski0afbb632008-06-06 21:10:57 +000045 """Each element in the arglist used to create a subcommand object,
46 where that arg is used both as a subdir name, and a single argument
47 to pass to "function".
48 We create a subcommand object for each element in the list,
49 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000050
jadmanski0afbb632008-06-06 21:10:57 +000051 # Bypass the multithreading if only one machine.
52 if len (arglist) == 1:
53 function(arglist[0])
54 return
55
56 subcommands = []
57 for arg in arglist:
58 args = [arg]
59 if log:
60 subdir = str(arg)
61 else:
62 subdir = None
63 subcommands.append(subcommand(function, args, subdir))
64 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000065
66
mblighcee25b12007-08-31 08:53:05 +000067def _where_art_thy_filehandles():
jadmanski0afbb632008-06-06 21:10:57 +000068 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
mblighb0fab822007-07-25 16:40:19 +000069
mblighdc735a22007-08-02 16:54:37 +000070
mblighcee25b12007-08-31 08:53:05 +000071def _print_to_tty(string):
jadmanski0afbb632008-06-06 21:10:57 +000072 open('/dev/tty', 'w').write(string + '\n')
mblighb0fab822007-07-25 16:40:19 +000073
74
mblighcee25b12007-08-31 08:53:05 +000075def _redirect_stream(fd, output):
mblighd4d86282009-02-26 00:46:34 +000076 newfd = os.open(output, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
jadmanski0afbb632008-06-06 21:10:57 +000077 os.dup2(newfd, fd)
78 os.close(newfd)
79 if fd == 1:
80 sys.stdout = os.fdopen(fd, 'w')
81 if fd == 2:
82 sys.stderr = os.fdopen(fd, 'w')
mblighb0fab822007-07-25 16:40:19 +000083
84
mblighcee25b12007-08-31 08:53:05 +000085def _redirect_stream_tee(fd, output, tag):
jadmanski0afbb632008-06-06 21:10:57 +000086 """Use the low-level fork & pipe operations here to get a fd,
87 not a filehandle. This ensures that we get both the
88 filehandle and fd for stdout/stderr redirected correctly."""
89 r, w = os.pipe()
90 pid = os.fork()
91 if pid: # Parent
92 os.dup2(w, fd)
93 os.close(r)
94 os.close(w)
95 if fd == 1:
96 sys.stdout = os.fdopen(fd, 'w', 1)
97 if fd == 2:
98 sys.stderr = os.fdopen(fd, 'w', 1)
99 return
100 else: # Child
jadmanskif4a87ca2008-09-16 17:31:48 +0000101 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
jadmanski0afbb632008-06-06 21:10:57 +0000102 os.close(w)
mblighd4d86282009-02-26 00:46:34 +0000103 log = open(output, 'a')
jadmanski0afbb632008-06-06 21:10:57 +0000104 f = os.fdopen(r, 'r')
105 for line in iter(f.readline, ''):
106 # Tee straight to file
107 log.write(line)
108 log.flush()
109 # Prepend stdout with the tag
110 print tag + ' : ' + line,
111 sys.stdout.flush()
112 log.close()
113 os._exit(0)
mblighb0fab822007-07-25 16:40:19 +0000114
115
jadmanski550fdc22008-11-20 16:32:08 +0000116class subcommand(object):
117 fork_hooks, join_hooks = [], []
118
jadmanski0afbb632008-06-06 21:10:57 +0000119 def __init__(self, func, args, subdir = None, stdprint = True):
120 # func(args) - the subcommand to run
121 # subdir - the subdirectory to log results in
122 # stdprint - whether to print results to stdout/stderr
123 if subdir:
124 self.subdir = os.path.abspath(subdir)
125 if not os.path.exists(self.subdir):
126 os.mkdir(self.subdir)
127 self.debug = os.path.join(self.subdir, 'debug')
128 if not os.path.exists(self.debug):
129 os.mkdir(self.debug)
130 self.stdout = os.path.join(self.debug, 'stdout')
131 self.stderr = os.path.join(self.debug, 'stderr')
132 else:
133 self.subdir = None
134 self.debug = '/dev/null'
135 self.stdout = '/dev/null'
136 self.stderr = '/dev/null'
mblighd7685d32007-08-10 22:08:42 +0000137
jadmanski0afbb632008-06-06 21:10:57 +0000138 self.func = func
139 self.args = args
140 self.lambda_function = lambda: func(*args)
141 self.pid = None
142 self.stdprint = stdprint
showardc408c5e2009-01-08 23:30:53 +0000143 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000144
145
jadmanski550fdc22008-11-20 16:32:08 +0000146 @classmethod
147 def register_fork_hook(cls, hook):
148 """ Register a function to be called from the child process after
149 forking. """
150 cls.fork_hooks.append(hook)
151
152
153 @classmethod
154 def register_join_hook(cls, hook):
155 """ Register a function to be called when from the child process
156 just before the child process terminates (joins to the parent). """
157 cls.join_hooks.append(hook)
158
159
jadmanski0afbb632008-06-06 21:10:57 +0000160 def redirect_output(self):
161 if self.stdprint:
162 if self.subdir:
163 tag = os.path.basename(self.subdir)
164 _redirect_stream_tee(1, self.stdout, tag)
165 _redirect_stream_tee(2, self.stderr, tag)
166 else:
167 _redirect_stream(1, self.stdout)
168 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000169
170
jadmanski0afbb632008-06-06 21:10:57 +0000171 def fork_start(self):
172 sys.stdout.flush()
173 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000174 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000175 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000176 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000177
jadmanski0afbb632008-06-06 21:10:57 +0000178 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000179 os.close(w)
180 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000181 return
mblighe1836812009-04-17 18:25:14 +0000182 else:
183 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000184
jadmanski0afbb632008-06-06 21:10:57 +0000185 # We are the child from this point on. Never return.
186 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
187 if self.subdir:
188 os.chdir(self.subdir)
189 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 try:
jadmanski550fdc22008-11-20 16:32:08 +0000192 for hook in self.fork_hooks:
193 hook(self)
mblighe1836812009-04-17 18:25:14 +0000194 result = self.lambda_function()
195 os.write(w, pickle.dumps(result))
196 except Exception, e:
jadmanski0afbb632008-06-06 21:10:57 +0000197 traceback.print_exc()
jadmanski550fdc22008-11-20 16:32:08 +0000198 exit_code = 1
mblighe1836812009-04-17 18:25:14 +0000199 os.write(w, pickle.dumps(e))
jadmanski550fdc22008-11-20 16:32:08 +0000200 else:
201 exit_code = 0
202
203 try:
204 for hook in self.join_hooks:
205 hook(self)
206 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000207 sys.stdout.flush()
208 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000209 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000210
211
showardc408c5e2009-01-08 23:30:53 +0000212 def _handle_exitstatus(self, sts):
213 """
214 This is partially borrowed from subprocess.Popen.
215 """
216 if os.WIFSIGNALED(sts):
217 self.returncode = -os.WTERMSIG(sts)
218 elif os.WIFEXITED(sts):
219 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000220 else:
showardc408c5e2009-01-08 23:30:53 +0000221 # Should never happen
222 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000223
showardc408c5e2009-01-08 23:30:53 +0000224 if self.returncode != 0:
225 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000226 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000227 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000228 print
229 if os.path.exists(self.stderr):
230 for line in open(self.stderr).readlines():
231 print line,
232 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000233 raise error.AutoservSubcommandError(self.func, self.returncode)
234
235
236 def poll(self):
237 """
238 This is borrowed from subprocess.Popen.
239 """
240 if self.returncode is None:
241 try:
242 pid, sts = os.waitpid(self.pid, os.WNOHANG)
243 if pid == self.pid:
244 self._handle_exitstatus(sts)
245 except os.error:
246 pass
247 return self.returncode
248
249
250 def wait(self):
251 """
252 This is borrowed from subprocess.Popen.
253 """
254 if self.returncode is None:
255 pid, sts = os.waitpid(self.pid, 0)
256 self._handle_exitstatus(sts)
257 return self.returncode
258
259
260 def fork_waitfor(self, timeout=None):
261 if not timeout:
262 return self.wait()
263 else:
264 start_time = time.time()
265 while time.time() <= start_time + timeout:
266 self.poll()
267 if self.returncode is not None:
268 return self.returncode
269 time.sleep(1)
270
271 utils.nuke_pid(self.pid)
272 print "subcommand failed pid %d" % self.pid
273 print "%s" % (self.func,)
274 print "timeout after %ds" % timeout
275 print
276 return None