blob: 6933b86d85e9f7114059815f0879aa9747f8f34f [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligh26f0d882009-06-22 18:30:01 +00003import sys, os, subprocess, traceback, time, signal, pickle, logging
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
showard75cdfee2009-06-10 17:40:41 +00008# entry points that use subcommand must set this to their logging manager
9# to get log redirection for subcommands
10logging_manager_object = None
11
12
mblighe1836812009-04-17 18:25:14 +000013def parallel(tasklist, timeout=None, return_results=False):
mbligh415dc212009-06-15 21:53:34 +000014 """
15 Run a set of predefined subcommands in parallel.
16
17 @param tasklist: A list of subcommand instances to execute.
18 @param timeout: Number of seconds after which the commands should timeout.
19 @param return_results: If True instead of an AutoServError being raised
20 on any error a list of the results|exceptions from the tasks is
21 returned. [default: False]
mblighe1836812009-04-17 18:25:14 +000022 """
jadmanski0afbb632008-06-06 21:10:57 +000023 pids = []
24 run_error = False
25 for task in tasklist:
26 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000027
jadmanski0afbb632008-06-06 21:10:57 +000028 remaining_timeout = None
29 if timeout:
30 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000031
mblighe1836812009-04-17 18:25:14 +000032 results = []
jadmanski0afbb632008-06-06 21:10:57 +000033 for task in tasklist:
34 if timeout:
35 remaining_timeout = max(endtime - time.time(), 1)
36 try:
37 status = task.fork_waitfor(remaining_timeout)
38 except error.AutoservSubcommandError:
39 run_error = True
40 else:
41 if status != 0:
42 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000043
mblighe1836812009-04-17 18:25:14 +000044 results.append(pickle.load(task.result_pickle))
45 task.result_pickle.close()
46
47 if return_results:
48 return results
49 elif run_error:
jadmanski0afbb632008-06-06 21:10:57 +000050 raise error.AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000051
52
mbligh415dc212009-06-15 21:53:34 +000053def parallel_simple(function, arglist, log=True, timeout=None,
54 return_results=False):
55 """
56 Each element in the arglist used to create a subcommand object,
jadmanski0afbb632008-06-06 21:10:57 +000057 where that arg is used both as a subdir name, and a single argument
58 to pass to "function".
mblighdd3235b2008-01-14 16:44:19 +000059
mbligh415dc212009-06-15 21:53:34 +000060 We create a subcommand object for each element in the list,
61 then execute those subcommand objects in parallel.
62
63 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
64
65 @param function: A callable to run in parallel once per arg in arglist.
66 @param arglist: A list of single arguments to be used one per subcommand;
67 typically a list of machine names.
68 @param log: If True, output will be written to output in a subdirectory
69 named after each subcommand's arg.
70 @param timeout: Number of seconds after which the commands should timeout.
71 @param return_results: If True instead of an AutoServError being raised
72 on any error a list of the results|exceptions from the function
73 called on each arg is returned. [default: False]
74
75 @returns None or a list of results/exceptions.
76 """
mbligh26f0d882009-06-22 18:30:01 +000077 if not arglist:
78 logging.warn("parallel_simple was called with an empty arglist, "
79 "did you forget to pass in a list of machines?")
jadmanski0afbb632008-06-06 21:10:57 +000080 # Bypass the multithreading if only one machine.
mbligh415dc212009-06-15 21:53:34 +000081 if len(arglist) == 1:
82 arg = arglist[0]
83 if return_results:
84 try:
85 result = function(arg)
86 except Exception, e:
87 return [e]
88 return [result]
89 else:
90 function(arg)
91 return
jadmanski0afbb632008-06-06 21:10:57 +000092
93 subcommands = []
94 for arg in arglist:
95 args = [arg]
96 if log:
97 subdir = str(arg)
98 else:
99 subdir = None
100 subcommands.append(subcommand(function, args, subdir))
mbligh415dc212009-06-15 21:53:34 +0000101 return parallel(subcommands, timeout, return_results=return_results)
mblighb0fab822007-07-25 16:40:19 +0000102
103
jadmanski550fdc22008-11-20 16:32:08 +0000104class subcommand(object):
105 fork_hooks, join_hooks = [], []
106
showard75cdfee2009-06-10 17:40:41 +0000107 def __init__(self, func, args, subdir = None):
jadmanski0afbb632008-06-06 21:10:57 +0000108 # func(args) - the subcommand to run
109 # subdir - the subdirectory to log results in
jadmanski0afbb632008-06-06 21:10:57 +0000110 if subdir:
111 self.subdir = os.path.abspath(subdir)
112 if not os.path.exists(self.subdir):
113 os.mkdir(self.subdir)
114 self.debug = os.path.join(self.subdir, 'debug')
115 if not os.path.exists(self.debug):
116 os.mkdir(self.debug)
jadmanski0afbb632008-06-06 21:10:57 +0000117 else:
118 self.subdir = None
showard75cdfee2009-06-10 17:40:41 +0000119 self.debug = None
mblighd7685d32007-08-10 22:08:42 +0000120
jadmanski0afbb632008-06-06 21:10:57 +0000121 self.func = func
122 self.args = args
123 self.lambda_function = lambda: func(*args)
124 self.pid = None
showardc408c5e2009-01-08 23:30:53 +0000125 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000126
127
jadmanski550fdc22008-11-20 16:32:08 +0000128 @classmethod
129 def register_fork_hook(cls, hook):
130 """ Register a function to be called from the child process after
131 forking. """
132 cls.fork_hooks.append(hook)
133
134
135 @classmethod
136 def register_join_hook(cls, hook):
137 """ Register a function to be called when from the child process
138 just before the child process terminates (joins to the parent). """
139 cls.join_hooks.append(hook)
140
141
jadmanski0afbb632008-06-06 21:10:57 +0000142 def redirect_output(self):
showard75cdfee2009-06-10 17:40:41 +0000143 if self.subdir and logging_manager_object:
144 tag = os.path.basename(self.subdir)
145 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
mblighb0fab822007-07-25 16:40:19 +0000146
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 def fork_start(self):
149 sys.stdout.flush()
150 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000151 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000152 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000153 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000154
jadmanski0afbb632008-06-06 21:10:57 +0000155 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000156 os.close(w)
157 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000158 return
mblighe1836812009-04-17 18:25:14 +0000159 else:
160 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 # We are the child from this point on. Never return.
163 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
164 if self.subdir:
165 os.chdir(self.subdir)
166 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000167
jadmanski0afbb632008-06-06 21:10:57 +0000168 try:
jadmanski550fdc22008-11-20 16:32:08 +0000169 for hook in self.fork_hooks:
170 hook(self)
mblighe1836812009-04-17 18:25:14 +0000171 result = self.lambda_function()
172 os.write(w, pickle.dumps(result))
173 except Exception, e:
jadmanski0afbb632008-06-06 21:10:57 +0000174 traceback.print_exc()
jadmanski550fdc22008-11-20 16:32:08 +0000175 exit_code = 1
mblighe1836812009-04-17 18:25:14 +0000176 os.write(w, pickle.dumps(e))
jadmanski550fdc22008-11-20 16:32:08 +0000177 else:
178 exit_code = 0
179
180 try:
181 for hook in self.join_hooks:
182 hook(self)
183 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000184 sys.stdout.flush()
185 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000186 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000187
188
showardc408c5e2009-01-08 23:30:53 +0000189 def _handle_exitstatus(self, sts):
190 """
191 This is partially borrowed from subprocess.Popen.
192 """
193 if os.WIFSIGNALED(sts):
194 self.returncode = -os.WTERMSIG(sts)
195 elif os.WIFEXITED(sts):
196 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000197 else:
showardc408c5e2009-01-08 23:30:53 +0000198 # Should never happen
199 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000200
showardc408c5e2009-01-08 23:30:53 +0000201 if self.returncode != 0:
202 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000203 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000204 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000205 print
showard75cdfee2009-06-10 17:40:41 +0000206 if self.debug:
207 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
208 if os.path.exists(stderr_file):
209 for line in open(stderr_file).readlines():
210 print line,
jadmanski0afbb632008-06-06 21:10:57 +0000211 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000212 raise error.AutoservSubcommandError(self.func, self.returncode)
213
214
215 def poll(self):
216 """
217 This is borrowed from subprocess.Popen.
218 """
219 if self.returncode is None:
220 try:
221 pid, sts = os.waitpid(self.pid, os.WNOHANG)
222 if pid == self.pid:
223 self._handle_exitstatus(sts)
224 except os.error:
225 pass
226 return self.returncode
227
228
229 def wait(self):
230 """
231 This is borrowed from subprocess.Popen.
232 """
233 if self.returncode is None:
234 pid, sts = os.waitpid(self.pid, 0)
235 self._handle_exitstatus(sts)
236 return self.returncode
237
238
239 def fork_waitfor(self, timeout=None):
240 if not timeout:
241 return self.wait()
242 else:
243 start_time = time.time()
244 while time.time() <= start_time + timeout:
245 self.poll()
246 if self.returncode is not None:
247 return self.returncode
248 time.sleep(1)
249
250 utils.nuke_pid(self.pid)
251 print "subcommand failed pid %d" % self.pid
252 print "%s" % (self.func,)
253 print "timeout after %ds" % timeout
254 print
255 return None