blob: 63d7ca5c0e937005e2092ca07b7bb8241d01be85 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mbligh589bf322008-05-27 21:28:15 +00003import sys, os, subprocess, traceback, time, signal
jadmanskid93d7d22008-05-29 21:37:29 +00004
5from autotest_lib.client.common_lib import error, utils
mblighb0fab822007-07-25 16:40:19 +00006
7
mblighc3aee0f2008-01-17 16:26:39 +00008def parallel(tasklist, timeout=None):
jadmanski0afbb632008-06-06 21:10:57 +00009 """Run an set of predefined subcommands in parallel"""
10 pids = []
11 run_error = False
12 for task in tasklist:
13 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000014
jadmanski0afbb632008-06-06 21:10:57 +000015 remaining_timeout = None
16 if timeout:
17 endtime = time.time() + timeout
mblighc3aee0f2008-01-17 16:26:39 +000018
jadmanski0afbb632008-06-06 21:10:57 +000019 for task in tasklist:
20 if timeout:
21 remaining_timeout = max(endtime - time.time(), 1)
22 try:
23 status = task.fork_waitfor(remaining_timeout)
24 except error.AutoservSubcommandError:
25 run_error = True
26 else:
27 if status != 0:
28 run_error = True
mbligh158ba7b2008-03-07 18:29:12 +000029
jadmanski0afbb632008-06-06 21:10:57 +000030 if run_error:
31 raise error.AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000032
33
mblighc3aee0f2008-01-17 16:26:39 +000034def parallel_simple(function, arglist, log=True, timeout=None):
jadmanski0afbb632008-06-06 21:10:57 +000035 """Each element in the arglist used to create a subcommand object,
36 where that arg is used both as a subdir name, and a single argument
37 to pass to "function".
38 We create a subcommand object for each element in the list,
39 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000040
jadmanski0afbb632008-06-06 21:10:57 +000041 # Bypass the multithreading if only one machine.
42 if len (arglist) == 1:
43 function(arglist[0])
44 return
45
46 subcommands = []
47 for arg in arglist:
48 args = [arg]
49 if log:
50 subdir = str(arg)
51 else:
52 subdir = None
53 subcommands.append(subcommand(function, args, subdir))
54 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000055
56
mblighcee25b12007-08-31 08:53:05 +000057def _where_art_thy_filehandles():
jadmanski0afbb632008-06-06 21:10:57 +000058 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
mblighb0fab822007-07-25 16:40:19 +000059
mblighdc735a22007-08-02 16:54:37 +000060
mblighcee25b12007-08-31 08:53:05 +000061def _print_to_tty(string):
jadmanski0afbb632008-06-06 21:10:57 +000062 open('/dev/tty', 'w').write(string + '\n')
mblighb0fab822007-07-25 16:40:19 +000063
64
mblighcee25b12007-08-31 08:53:05 +000065def _redirect_stream(fd, output):
jadmanski0afbb632008-06-06 21:10:57 +000066 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
67 os.dup2(newfd, fd)
68 os.close(newfd)
69 if fd == 1:
70 sys.stdout = os.fdopen(fd, 'w')
71 if fd == 2:
72 sys.stderr = os.fdopen(fd, 'w')
mblighb0fab822007-07-25 16:40:19 +000073
74
mblighcee25b12007-08-31 08:53:05 +000075def _redirect_stream_tee(fd, output, tag):
jadmanski0afbb632008-06-06 21:10:57 +000076 """Use the low-level fork & pipe operations here to get a fd,
77 not a filehandle. This ensures that we get both the
78 filehandle and fd for stdout/stderr redirected correctly."""
79 r, w = os.pipe()
80 pid = os.fork()
81 if pid: # Parent
82 os.dup2(w, fd)
83 os.close(r)
84 os.close(w)
85 if fd == 1:
86 sys.stdout = os.fdopen(fd, 'w', 1)
87 if fd == 2:
88 sys.stderr = os.fdopen(fd, 'w', 1)
89 return
90 else: # Child
jadmanskif4a87ca2008-09-16 17:31:48 +000091 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
jadmanski0afbb632008-06-06 21:10:57 +000092 os.close(w)
93 log = open(output, 'w')
94 f = os.fdopen(r, 'r')
95 for line in iter(f.readline, ''):
96 # Tee straight to file
97 log.write(line)
98 log.flush()
99 # Prepend stdout with the tag
100 print tag + ' : ' + line,
101 sys.stdout.flush()
102 log.close()
103 os._exit(0)
mblighb0fab822007-07-25 16:40:19 +0000104
105
jadmanski550fdc22008-11-20 16:32:08 +0000106class subcommand(object):
107 fork_hooks, join_hooks = [], []
108
jadmanski0afbb632008-06-06 21:10:57 +0000109 def __init__(self, func, args, subdir = None, stdprint = True):
110 # func(args) - the subcommand to run
111 # subdir - the subdirectory to log results in
112 # stdprint - whether to print results to stdout/stderr
113 if subdir:
114 self.subdir = os.path.abspath(subdir)
115 if not os.path.exists(self.subdir):
116 os.mkdir(self.subdir)
117 self.debug = os.path.join(self.subdir, 'debug')
118 if not os.path.exists(self.debug):
119 os.mkdir(self.debug)
120 self.stdout = os.path.join(self.debug, 'stdout')
121 self.stderr = os.path.join(self.debug, 'stderr')
122 else:
123 self.subdir = None
124 self.debug = '/dev/null'
125 self.stdout = '/dev/null'
126 self.stderr = '/dev/null'
mblighd7685d32007-08-10 22:08:42 +0000127
jadmanski0afbb632008-06-06 21:10:57 +0000128 self.func = func
129 self.args = args
130 self.lambda_function = lambda: func(*args)
131 self.pid = None
132 self.stdprint = stdprint
showardc408c5e2009-01-08 23:30:53 +0000133 self.returncode = None
mblighb0fab822007-07-25 16:40:19 +0000134
135
jadmanski550fdc22008-11-20 16:32:08 +0000136 @classmethod
137 def register_fork_hook(cls, hook):
138 """ Register a function to be called from the child process after
139 forking. """
140 cls.fork_hooks.append(hook)
141
142
143 @classmethod
144 def register_join_hook(cls, hook):
145 """ Register a function to be called when from the child process
146 just before the child process terminates (joins to the parent). """
147 cls.join_hooks.append(hook)
148
149
jadmanski0afbb632008-06-06 21:10:57 +0000150 def redirect_output(self):
151 if self.stdprint:
152 if self.subdir:
153 tag = os.path.basename(self.subdir)
154 _redirect_stream_tee(1, self.stdout, tag)
155 _redirect_stream_tee(2, self.stderr, tag)
156 else:
157 _redirect_stream(1, self.stdout)
158 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000159
160
jadmanski0afbb632008-06-06 21:10:57 +0000161 def fork_start(self):
162 sys.stdout.flush()
163 sys.stderr.flush()
164 self.pid = os.fork()
mblighb0fab822007-07-25 16:40:19 +0000165
jadmanski0afbb632008-06-06 21:10:57 +0000166 if self.pid: # I am the parent
167 return
mblighb0fab822007-07-25 16:40:19 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 # We are the child from this point on. Never return.
170 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
171 if self.subdir:
172 os.chdir(self.subdir)
173 self.redirect_output()
mblighb0fab822007-07-25 16:40:19 +0000174
jadmanski0afbb632008-06-06 21:10:57 +0000175 try:
jadmanski550fdc22008-11-20 16:32:08 +0000176 for hook in self.fork_hooks:
177 hook(self)
jadmanski0afbb632008-06-06 21:10:57 +0000178 self.lambda_function()
jadmanski0afbb632008-06-06 21:10:57 +0000179 except:
180 traceback.print_exc()
jadmanski550fdc22008-11-20 16:32:08 +0000181 exit_code = 1
182 else:
183 exit_code = 0
184
185 try:
186 for hook in self.join_hooks:
187 hook(self)
188 finally:
jadmanski0afbb632008-06-06 21:10:57 +0000189 sys.stdout.flush()
190 sys.stderr.flush()
jadmanski550fdc22008-11-20 16:32:08 +0000191 os._exit(exit_code)
mblighb0fab822007-07-25 16:40:19 +0000192
193
showardc408c5e2009-01-08 23:30:53 +0000194 def _handle_exitstatus(self, sts):
195 """
196 This is partially borrowed from subprocess.Popen.
197 """
198 if os.WIFSIGNALED(sts):
199 self.returncode = -os.WTERMSIG(sts)
200 elif os.WIFEXITED(sts):
201 self.returncode = os.WEXITSTATUS(sts)
jadmanski0afbb632008-06-06 21:10:57 +0000202 else:
showardc408c5e2009-01-08 23:30:53 +0000203 # Should never happen
204 raise RuntimeError("Unknown child exit status!")
mblighc3aee0f2008-01-17 16:26:39 +0000205
showardc408c5e2009-01-08 23:30:53 +0000206 if self.returncode != 0:
207 print "subcommand failed pid %d" % self.pid
jadmanski0afbb632008-06-06 21:10:57 +0000208 print "%s" % (self.func,)
showardc408c5e2009-01-08 23:30:53 +0000209 print "rc=%d" % self.returncode
jadmanski0afbb632008-06-06 21:10:57 +0000210 print
211 if os.path.exists(self.stderr):
212 for line in open(self.stderr).readlines():
213 print line,
214 print "\n--------------------------------------------\n"
showardc408c5e2009-01-08 23:30:53 +0000215 raise error.AutoservSubcommandError(self.func, self.returncode)
216
217
218 def poll(self):
219 """
220 This is borrowed from subprocess.Popen.
221 """
222 if self.returncode is None:
223 try:
224 pid, sts = os.waitpid(self.pid, os.WNOHANG)
225 if pid == self.pid:
226 self._handle_exitstatus(sts)
227 except os.error:
228 pass
229 return self.returncode
230
231
232 def wait(self):
233 """
234 This is borrowed from subprocess.Popen.
235 """
236 if self.returncode is None:
237 pid, sts = os.waitpid(self.pid, 0)
238 self._handle_exitstatus(sts)
239 return self.returncode
240
241
242 def fork_waitfor(self, timeout=None):
243 if not timeout:
244 return self.wait()
245 else:
246 start_time = time.time()
247 while time.time() <= start_time + timeout:
248 self.poll()
249 if self.returncode is not None:
250 return self.returncode
251 time.sleep(1)
252
253 utils.nuke_pid(self.pid)
254 print "subcommand failed pid %d" % self.pid
255 print "%s" % (self.func,)
256 print "timeout after %ds" % timeout
257 print
258 return None