blob: acaf4d77d0fa7f16cba082e52073178778f61cf2 [file] [log] [blame]
mblighb0fab822007-07-25 16:40:19 +00001__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
mblighbb421852008-03-11 22:36:16 +00003import sys, os, subprocess, traceback, time, utils, signal
mbligh3177be42008-01-15 20:36:54 +00004from common.error import *
mblighb0fab822007-07-25 16:40:19 +00005
6
mblighc3aee0f2008-01-17 16:26:39 +00007def parallel(tasklist, timeout=None):
mblighb0fab822007-07-25 16:40:19 +00008 """Run an set of predefined subcommands in parallel"""
9 pids = []
10 error = False
11 for task in tasklist:
12 task.fork_start()
mblighc3aee0f2008-01-17 16:26:39 +000013
14 remaining_timeout = None
15 if timeout:
16 endtime = time.time() + timeout
17
mblighb0fab822007-07-25 16:40:19 +000018 for task in tasklist:
mblighc3aee0f2008-01-17 16:26:39 +000019 if timeout:
20 remaining_timeout = max(endtime - time.time(), 1)
mbligh158ba7b2008-03-07 18:29:12 +000021 try:
22 status = task.fork_waitfor(remaining_timeout)
23 except AutoservSubcommandError:
mblighb0fab822007-07-25 16:40:19 +000024 error = True
mbligh158ba7b2008-03-07 18:29:12 +000025 else:
26 if status != 0:
27 error = True
28
mblighb0fab822007-07-25 16:40:19 +000029 if error:
mbligh4d6feff2008-01-14 16:48:56 +000030 raise AutoservError('One or more subcommands failed')
mblighb0fab822007-07-25 16:40:19 +000031
32
mblighc3aee0f2008-01-17 16:26:39 +000033def parallel_simple(function, arglist, log=True, timeout=None):
mblighb0fab822007-07-25 16:40:19 +000034 """Each element in the arglist used to create a subcommand object,
35 where that arg is used both as a subdir name, and a single argument
36 to pass to "function".
37 We create a subcommand object for each element in the list,
38 then execute those subcommand objects in parallel."""
mblighdd3235b2008-01-14 16:44:19 +000039
40 # Bypass the multithreading if only one machine.
41 if len (arglist) == 1:
42 function(arglist[0])
43 return
44
mblighb0fab822007-07-25 16:40:19 +000045 subcommands = []
46 for arg in arglist:
mbligh0c9e7822007-07-25 22:47:51 +000047 args = [arg]
mbligh84c0ab12007-10-24 21:28:58 +000048 if log:
49 subdir = str(arg)
50 else:
mblighc3aee0f2008-01-17 16:26:39 +000051 subdir = None
mbligh0c9e7822007-07-25 22:47:51 +000052 subcommands.append(subcommand(function, args, subdir))
mblighc3aee0f2008-01-17 16:26:39 +000053 parallel(subcommands, timeout)
mblighb0fab822007-07-25 16:40:19 +000054
55
mblighcee25b12007-08-31 08:53:05 +000056def _where_art_thy_filehandles():
mblighb0fab822007-07-25 16:40:19 +000057 os.system("ls -l /proc/%d/fd >> /dev/tty" % os.getpid())
58
mblighdc735a22007-08-02 16:54:37 +000059
mblighcee25b12007-08-31 08:53:05 +000060def _print_to_tty(string):
mblighb0fab822007-07-25 16:40:19 +000061 open('/dev/tty', 'w').write(string + '\n')
62
63
mblighcee25b12007-08-31 08:53:05 +000064def _redirect_stream(fd, output):
mblighb0fab822007-07-25 16:40:19 +000065 newfd = os.open(output, os.O_WRONLY | os.O_CREAT)
66 os.dup2(newfd, fd)
67 os.close(newfd)
68 if fd == 1:
69 sys.stdout = os.fdopen(fd, 'w')
70 if fd == 2:
71 sys.stderr = os.fdopen(fd, 'w')
72
73
mblighcee25b12007-08-31 08:53:05 +000074def _redirect_stream_tee(fd, output, tag):
mblighb0fab822007-07-25 16:40:19 +000075 """Use the low-level fork & pipe operations here to get a fd,
76 not a filehandle. This ensures that we get both the
77 filehandle and fd for stdout/stderr redirected correctly."""
78 r, w = os.pipe()
79 pid = os.fork()
80 if pid: # Parent
81 os.dup2(w, fd)
82 os.close(r)
83 os.close(w)
84 if fd == 1:
85 sys.stdout = os.fdopen(fd, 'w', 1)
86 if fd == 2:
87 sys.stderr = os.fdopen(fd, 'w', 1)
88 return
89 else: # Child
90 os.close(w)
91 log = open(output, 'w')
92 f = os.fdopen(r, 'r')
93 for line in iter(f.readline, ''):
94 # Tee straight to file
95 log.write(line)
96 log.flush()
97 # Prepend stdout with the tag
98 print tag + ' : ' + line,
99 sys.stdout.flush()
100 log.close()
101 os._exit(0)
102
103
104class subcommand:
mblighd7685d32007-08-10 22:08:42 +0000105 def __init__(self, func, args, subdir = None, stdprint = True):
mblighb0fab822007-07-25 16:40:19 +0000106 # func(args) - the subcommand to run
107 # subdir - the subdirectory to log results in
mblighd7685d32007-08-10 22:08:42 +0000108 # stdprint - whether to print results to stdout/stderr
109 if subdir:
110 self.subdir = os.path.abspath(subdir)
mbligha1956d32008-02-08 16:49:56 +0000111 if not os.path.exists(self.subdir):
112 os.mkdir(self.subdir)
mblighd7685d32007-08-10 22:08:42 +0000113 self.debug = os.path.join(self.subdir, 'debug')
mbligh61878d92008-02-08 16:50:17 +0000114 if not os.path.exists(self.debug):
115 os.mkdir(self.debug)
mblighd7685d32007-08-10 22:08:42 +0000116 self.stdout = os.path.join(self.debug, 'stdout')
117 self.stderr = os.path.join(self.debug, 'stderr')
118 else:
119 self.subdir = None
120 self.debug = '/dev/null'
121 self.stdout = '/dev/null'
122 self.stderr = '/dev/null'
123
mblighb0fab822007-07-25 16:40:19 +0000124 self.func = func
125 self.args = args
126 self.lambda_function = lambda: func(*args)
127 self.pid = None
mblighd7685d32007-08-10 22:08:42 +0000128 self.stdprint = stdprint
mblighb0fab822007-07-25 16:40:19 +0000129
130
131 def redirect_output(self):
mblighd7685d32007-08-10 22:08:42 +0000132 if self.stdprint:
133 if self.subdir:
134 tag = os.path.basename(self.subdir)
mblighcee25b12007-08-31 08:53:05 +0000135 _redirect_stream_tee(1, self.stdout, tag)
136 _redirect_stream_tee(2, self.stderr, tag)
mblighb491d022007-08-09 23:04:56 +0000137 else:
mblighcee25b12007-08-31 08:53:05 +0000138 _redirect_stream(1, self.stdout)
139 _redirect_stream(2, self.stderr)
mblighb0fab822007-07-25 16:40:19 +0000140
141
142 def fork_start(self):
143 sys.stdout.flush()
144 sys.stderr.flush()
145 self.pid = os.fork()
146
147 if self.pid: # I am the parent
148 return
149
150 # We are the child from this point on. Never return.
mblighbb421852008-03-11 22:36:16 +0000151 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
mblighd7685d32007-08-10 22:08:42 +0000152 if self.subdir:
153 os.chdir(self.subdir)
mblighb0fab822007-07-25 16:40:19 +0000154 self.redirect_output()
155
156 try:
157 self.lambda_function()
158
159 except:
mbligh42ff92f2007-11-24 19:13:15 +0000160 traceback.print_exc()
mblighb0fab822007-07-25 16:40:19 +0000161 sys.stdout.flush()
162 sys.stderr.flush()
163 os._exit(1)
164
165 sys.stdout.flush()
166 sys.stderr.flush()
167 os._exit(0)
168
169
mblighc3aee0f2008-01-17 16:26:39 +0000170 def fork_waitfor(self, timeout=None):
171 if not timeout:
172 (pid, status) = os.waitpid(self.pid, 0)
173 else:
174 pid = None
175 start_time = time.time()
176 while time.time() <= start_time + timeout:
177 (pid, status) = os.waitpid(self.pid, os.WNOHANG)
178 if pid:
179 break
180 time.sleep(1)
181
182 if not pid:
183 utils.nuke_pid(self.pid)
184 print "subcommand failed pid %d" % self.pid
mbligheec4d7a2008-02-08 16:50:43 +0000185 print "%s" % (self.func,)
mblighc3aee0f2008-01-17 16:26:39 +0000186 print "timeout after %ds" % timeout
187 print
188 return None
mblighb0fab822007-07-25 16:40:19 +0000189
190 if status != 0:
191 print "subcommand failed pid %d" % pid
mbligheec4d7a2008-02-08 16:50:43 +0000192 print "%s" % (self.func,)
mblighb0fab822007-07-25 16:40:19 +0000193 print "rc=%d" % status
194 print
195 if os.path.exists(self.stderr):
196 for line in open(self.stderr).readlines():
197 print line,
198 print "\n--------------------------------------------\n"
mbligh6e2ffec2008-03-05 16:08:34 +0000199 raise AutoservSubcommandError(self.func, status)
mblighb0fab822007-07-25 16:40:19 +0000200 return status