blob: 37feb88f304e46f60bf574dd305bf1762783627e [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mblighe1836812009-04-17 18:25:14 +00003import sys, os, subprocess, traceback, time, signal, pickle
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
showard75cdfee2009-06-10 17:40:41 +00008# entry points that use subcommand must set this to their logging manager
9# to get log redirection for subcommands
10logging_manager_object = None
11
12
mblighe1836812009-04-17 18:25:14 +000013def parallel(tasklist, timeout=None, return_results=False):
14 """Run a set of predefined subcommands in parallel.
15 If return_results is True instead of an exception being raised on
16 any error a list of the results/exceptions from the functions is
17 returned.
18 """
jadmanski0afbb632008-06-06 21:10:57 +000019 pids = []
20 run_error = False
21 for task in tasklist:
22 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000023
jadmanski0afbb632008-06-06 21:10:57 +000024 remaining_timeout = None
25 if timeout:
26 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000027
mblighe1836812009-04-17 18:25:14 +000028 results = []
jadmanski0afbb632008-06-06 21:10:57 +000029 for task in tasklist:
30 if timeout:
31 remaining_timeout = max(endtime - time.time(), 1)
32 try:
33 status = task.fork_waitfor(remaining_timeout)
34 except error.AutoservSubcommandError:
35 run_error = True
36 else:
37 if status != 0:
38 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000039
mblighe1836812009-04-17 18:25:14 +000040 results.append(pickle.load(task.result_pickle))
41 task.result_pickle.close()
42
43 if return_results:
44 return results
45 elif run_error:
jadmanski0afbb632008-06-06 21:10:57 +000046 raise error.AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000047
48
mblighc3aee0f2008-01-17 16:26:39 +000049def parallel_simple(function, arglist, log=True, timeout=None):
jadmanski0afbb632008-06-06 21:10:57 +000050 """Each element in the arglist used to create a subcommand object,
51 where that arg is used both as a subdir name, and a single argument
52 to pass to "function".
53 We create a subcommand object for each element in the list,
54 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000055
jadmanski0afbb632008-06-06 21:10:57 +000056 # Bypass the multithreading if only one machine.
57 if len (arglist) == 1:
58 function(arglist[0])
59 return
60
61 subcommands = []
62 for arg in arglist:
63 args = [arg]
64 if log:
65 subdir = str(arg)
66 else:
67 subdir = None
68 subcommands.append(subcommand(function, args, subdir))
69 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000070
71
jadmanski550fdc22008-11-20 16:32:08 +000072class subcommand(object):
73 fork_hooks, join_hooks = [], []
74
showard75cdfee2009-06-10 17:40:41 +000075 def __init__(self, func, args, subdir = None):
jadmanski0afbb632008-06-06 21:10:57 +000076 # func(args) - the subcommand to run
77 # subdir - the subdirectory to log results in
jadmanski0afbb632008-06-06 21:10:57 +000078 if subdir:
79 self.subdir = os.path.abspath(subdir)
80 if not os.path.exists(self.subdir):
81 os.mkdir(self.subdir)
82 self.debug = os.path.join(self.subdir, 'debug')
83 if not os.path.exists(self.debug):
84 os.mkdir(self.debug)
jadmanski0afbb632008-06-06 21:10:57 +000085 else:
86 self.subdir = None
showard75cdfee2009-06-10 17:40:41 +000087 self.debug = None
mblighd7685d32007-08-10 22:08:42 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 self.func = func
90 self.args = args
91 self.lambda_function = lambda: func(*args)
92 self.pid = None
showardc408c5e2009-01-08 23:30:53 +000093 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +000094
95
jadmanski550fdc22008-11-20 16:32:08 +000096 @classmethod
97 def register_fork_hook(cls, hook):
98 """ Register a function to be called from the child process after
99 forking. """
100 cls.fork_hooks.append(hook)
101
102
103 @classmethod
104 def register_join_hook(cls, hook):
105 """ Register a function to be called when from the child process
106 just before the child process terminates (joins to the parent). """
107 cls.join_hooks.append(hook)
108
109
jadmanski0afbb632008-06-06 21:10:57 +0000110 def redirect_output(self):
showard75cdfee2009-06-10 17:40:41 +0000111 if self.subdir and logging_manager_object:
112 tag = os.path.basename(self.subdir)
113 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
mblighb0fab822007-07-25 16:40:19 +0000114
115
jadmanski0afbb632008-06-06 21:10:57 +0000116 def fork_start(self):
117 sys.stdout.flush()
118 sys.stderr.flush()
mblighe1836812009-04-17 18:25:14 +0000119 r, w = os.pipe()
jadmanski77e8da82009-04-21 14:26:40 +0000120 self.returncode = None
jadmanski0afbb632008-06-06 21:10:57 +0000121 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000122
jadmanski0afbb632008-06-06 21:10:57 +0000123 if self.pid: # I am the parent
mblighe1836812009-04-17 18:25:14 +0000124 os.close(w)
125 self.result_pickle = os.fdopen(r, 'r')
jadmanski0afbb632008-06-06 21:10:57 +0000126 return
mblighe1836812009-04-17 18:25:14 +0000127 else:
128 os.close(r)
mblighb0fab822007-07-25 16:40:19 +0000129
jadmanski0afbb632008-06-06 21:10:57 +0000130 # We are the child from this point on. Never return.
131 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
132 if self.subdir:
133 os.chdir(self.subdir)
134 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000135
jadmanski0afbb632008-06-06 21:10:57 +0000136 try:
jadmanski550fdc22008-11-20 16:32:08 +0000137 for hook in self.fork_hooks:
138 hook(self)
mblighe1836812009-04-17 18:25:14 +0000139 result = self.lambda_function()
140 os.write(w, pickle.dumps(result))
141 except Exception, e:
jadmanski0afbb632008-06-06 21:10:57 +0000142 traceback.print_exc()
jadmanski550fdc22008-11-20 16:32:08 +0000143 exit_code = 1
mblighe1836812009-04-17 18:25:14 +0000144 os.write(w, pickle.dumps(e))
jadmanski550fdc22008-11-20 16:32:08 +0000145 else:
146 exit_code = 0
147
148 try:
149 for hook in self.join_hooks:
150 hook(self)
151 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000152 sys.stdout.flush()
153 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000154 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000155
156
showardc408c5e2009-01-08 23:30:53 +0000157 def _handle_exitstatus(self, sts):
158 """
159 This is partially borrowed from subprocess.Popen.
160 """
161 if os.WIFSIGNALED(sts):
162 self.returncode = -os.WTERMSIG(sts)
163 elif os.WIFEXITED(sts):
164 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000165 else:
showardc408c5e2009-01-08 23:30:53 +0000166 # Should never happen
167 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000168
showardc408c5e2009-01-08 23:30:53 +0000169 if self.returncode != 0:
170 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000171 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000172 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000173 print
showard75cdfee2009-06-10 17:40:41 +0000174 if self.debug:
175 stderr_file = os.path.join(self.debug, 'autoserv.stderr')
176 if os.path.exists(stderr_file):
177 for line in open(stderr_file).readlines():
178 print line,
jadmanski0afbb632008-06-06 21:10:57 +0000179 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000180 raise error.AutoservSubcommandError(self.func, self.returncode)
181
182
183 def poll(self):
184 """
185 This is borrowed from subprocess.Popen.
186 """
187 if self.returncode is None:
188 try:
189 pid, sts = os.waitpid(self.pid, os.WNOHANG)
190 if pid == self.pid:
191 self._handle_exitstatus(sts)
192 except os.error:
193 pass
194 return self.returncode
195
196
197 def wait(self):
198 """
199 This is borrowed from subprocess.Popen.
200 """
201 if self.returncode is None:
202 pid, sts = os.waitpid(self.pid, 0)
203 self._handle_exitstatus(sts)
204 return self.returncode
205
206
207 def fork_waitfor(self, timeout=None):
208 if not timeout:
209 return self.wait()
210 else:
211 start_time = time.time()
212 while time.time() <= start_time + timeout:
213 self.poll()
214 if self.returncode is not None:
215 return self.returncode
216 time.sleep(1)
217
218 utils.nuke_pid(self.pid)
219 print "subcommand failed pid %d" % self.pid
220 print "%s" % (self.func,)
221 print "timeout after %ds" % timeout
222 print
223 return None